@@ -213,6 +213,15 @@ impl Worker {
213213 if let Err ( e) = self . flush_all ( ) {
214214 warn ! ( "Worker {} final flush error: {}" , self . id, e) ;
215215 }
216+ // Force-close any windows still open after the final flush.
217+ // `flush_all` only advances the watermark by +1ms (plus the
218+ // wall-clock fallback, whose grace may not have elapsed for a
219+ // one-shot batch), so the trailing window can remain open and
220+ // its data would never reach the store. No more samples will
221+ // arrive after shutdown, so close every remaining pane.
222+ if let Err ( e) = self . force_close_all ( ) {
223+ warn ! ( "Worker {} shutdown force-close error: {}" , self . id, e) ;
224+ }
216225 break ;
217226 }
218227 WorkerMessage :: UpdateAggConfigs ( new_configs) => {
@@ -635,6 +644,85 @@ impl Worker {
635644 Ok ( ( ) )
636645 }
637646
647+ /// Force-close every window still open on shutdown.
648+ ///
649+ /// Unlike `flush_all` — which only advances the watermark by `+1ms` (plus
650+ /// the wall-clock fallback, gated on grace having elapsed) — this emits the
651+ /// window for every remaining pane unconditionally, because no further
652+ /// samples will arrive once the engine is shutting down. Without it, a
653+ /// one-shot batch whose records all fall in a single window (so event-time
654+ /// never advances past the window end) would leave that window open forever
655+ /// and never write it to the store.
656+ ///
657+ /// To advance past the open windows we use a *finite* bound derived from
658+ /// the largest open pane (`max_pane + window_size_ms`) rather than
659+ /// `i64::MAX`: `WindowManager::closed_windows` enumerates window starts up
660+ /// to `current_wm` one slide at a time, so passing `i64::MAX` would loop
661+ /// ~`i64::MAX / slide` times and overflow. `max_pane + window_size_ms` is
662+ /// the smallest watermark that closes the latest open window.
663+ ///
664+ /// Idempotent: closed panes are drained from `active_panes` and their
665+ /// wall-clock bookkeeping is pruned, so a second call emits nothing.
666+ fn force_close_all ( & mut self ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
667+ if self . pass_raw_samples {
668+ return Ok ( ( ) ) ;
669+ }
670+
671+ let mut emit_batch: Vec < ( PrecomputedOutput , Box < dyn AggregateCore > ) > = Vec :: new ( ) ;
672+
673+ for ( agg_id, inner) in & mut self . group_states {
674+ for ( group_key, state) in inner. iter_mut ( ) {
675+ if state. previous_watermark_ms == i64:: MIN {
676+ continue ; // never received data — nothing to close
677+ }
678+ // The latest window start equals the largest open pane start;
679+ // closing window `[start, start + size)` needs `wm >= start + size`.
680+ let Some ( & max_pane) = state. active_panes . keys ( ) . next_back ( ) else {
681+ continue ; // no open panes
682+ } ;
683+ let force_wm = max_pane. saturating_add ( state. window_manager . window_size_ms ( ) ) ;
684+
685+ let closed = state
686+ . window_manager
687+ . closed_windows ( state. previous_watermark_ms , force_wm) ;
688+
689+ for window_start in & closed {
690+ let ( _, window_end) = state. window_manager . window_bounds ( * window_start) ;
691+ let pane_starts = state. window_manager . panes_for_window ( * window_start) ;
692+
693+ if let Some ( accumulator) =
694+ merge_panes_for_window ( & mut state. active_panes , & pane_starts)
695+ {
696+ let key = build_group_key_label_values ( group_key) ;
697+ let output = PrecomputedOutput :: new (
698+ * window_start as u64 ,
699+ window_end as u64 ,
700+ Some ( key) ,
701+ * agg_id,
702+ ) ;
703+ emit_batch. push ( ( output, accumulator) ) ;
704+ }
705+ }
706+
707+ if force_wm > state. previous_watermark_ms {
708+ state. previous_watermark_ms = force_wm;
709+ }
710+ state. prune_pane_wall_clock_starts ( ) ;
711+ }
712+ }
713+
714+ if !emit_batch. is_empty ( ) {
715+ debug ! (
716+ "Worker {} shutdown force-close emitting {} outputs" ,
717+ self . id,
718+ emit_batch. len( )
719+ ) ;
720+ self . output_sink . emit_batch ( emit_batch) ?;
721+ }
722+
723+ Ok ( ( ) )
724+ }
725+
638726 /// Compute the global watermark as min(all worker watermarks), ignoring
639727 /// workers that haven't started yet (still at i64::MIN).
640728 fn compute_global_watermark ( & self ) -> i64 {
@@ -2253,11 +2341,18 @@ aggregations:
22532341 tx. send ( WorkerMessage :: Shutdown ) . await . unwrap ( ) ;
22542342 handle. await . unwrap ( ) ;
22552343
2256- let captured = sink. drain ( ) ;
2344+ let mut captured = sink. drain ( ) ;
2345+ // Two windows close:
2346+ // 1. [0, 10_000) — closed inline when the t=10_000 sample advanced
2347+ // the watermark; contains only the post-update t=5_000 sample.
2348+ // 2. [10_000, 20_000) — left open by the watermark but force-closed on
2349+ // shutdown; contains the t=10_000 sample. Before the shutdown
2350+ // force-close this trailing window was silently lost.
2351+ captured. sort_by_key ( |( o, _) | o. start_timestamp ) ;
22572352 assert_eq ! (
22582353 captured. len( ) ,
2259- 1 ,
2260- "one window should close after UpdateAggConfigs "
2354+ 2 ,
2355+ "window [0,10_000) closes inline; [10_000,20_000) force-closes on shutdown "
22612356 ) ;
22622357
22632358 let ( output, acc) = & captured[ 0 ] ;
@@ -2270,12 +2365,27 @@ aggregations:
22702365 . downcast_ref :: < SumAccumulator > ( )
22712366 . expect ( "should be SumAccumulator" ) ;
22722367 // Pre-update sample (t=1000, val=1.0) was dropped — agg_id was unknown.
2273- // Post-update sample (t=5000, val=2.0) is the only one aggregated .
2368+ // Post-update sample (t=5000, val=2.0) is the only one in this window .
22742369 assert ! (
22752370 ( sum_acc. sum - 2.0 ) . abs( ) < 1e-10 ,
22762371 "only post-update sample should be aggregated, got {}" ,
22772372 sum_acc. sum
22782373 ) ;
2374+
2375+ // The trailing window force-closed on shutdown holds the t=10_000
2376+ // sample (val=0.0).
2377+ let ( trailing_output, trailing_acc) = & captured[ 1 ] ;
2378+ assert_eq ! ( trailing_output. start_timestamp, 10_000 ) ;
2379+ assert_eq ! ( trailing_output. end_timestamp, 20_000 ) ;
2380+ let trailing_sum = trailing_acc
2381+ . as_any ( )
2382+ . downcast_ref :: < SumAccumulator > ( )
2383+ . expect ( "should be SumAccumulator" ) ;
2384+ assert ! (
2385+ trailing_sum. sum. abs( ) < 1e-10 ,
2386+ "trailing window should hold the t=10_000 sample (val=0.0), got {}" ,
2387+ trailing_sum. sum
2388+ ) ;
22792389 }
22802390
22812391 // -----------------------------------------------------------------------
@@ -2424,4 +2534,74 @@ aggregations:
24242534 "grace=0 must disable the fallback — event-time-only semantics"
24252535 ) ;
24262536 }
2537+
2538+ // -----------------------------------------------------------------------
2539+ // Test: shutdown force-close emits the trailing window
2540+ //
2541+ // The immediate-shutdown batch case: every record falls in one window and
2542+ // no later timestamp ever advances the watermark, so flush_all (with the
2543+ // wall-clock fallback disabled, grace=0) leaves the window open. On
2544+ // shutdown, force_close_all must close and emit it so the data reaches the
2545+ // store instead of being lost.
2546+ // -----------------------------------------------------------------------
2547+
2548+ #[ test]
2549+ fn shutdown_force_close_emits_trailing_window ( ) {
2550+ // 10s tumbling window; grace=0 isolates the force-close from the
2551+ // wall-clock fallback.
2552+ let cfg = make_agg_config (
2553+ 7 ,
2554+ "cpu" ,
2555+ AggregationType :: SingleSubpopulation ,
2556+ "Sum" ,
2557+ 10 ,
2558+ 0 ,
2559+ vec ! [ ] ,
2560+ ) ;
2561+ let agg_configs = HashMap :: from ( [ ( 7 , cfg) ] ) ;
2562+ let sink = Arc :: new ( CapturingOutputSink :: new ( ) ) ;
2563+ let mut worker = make_worker_with_grace ( agg_configs, sink. clone ( ) , 0 ) ;
2564+
2565+ // All samples land in window [0, 10_000); the watermark freezes below
2566+ // the window end because no later timestamp ever arrives.
2567+ for i in 0 ..5 {
2568+ worker
2569+ . process_group_samples (
2570+ 7 ,
2571+ "" ,
2572+ group_samples ( "cpu" , vec ! [ ( 1_000 + i as i64 * 100 , 1.0 ) ] ) ,
2573+ )
2574+ . unwrap ( ) ;
2575+ }
2576+
2577+ // A final flush must NOT close the window (event-time frozen, fallback
2578+ // disabled) — this is the bug the force-close fixes.
2579+ worker. flush_all ( ) . unwrap ( ) ;
2580+ assert_eq ! (
2581+ sink. len( ) ,
2582+ 0 ,
2583+ "trailing window must remain open after the final flush"
2584+ ) ;
2585+
2586+ // Shutdown force-close closes and emits the trailing window.
2587+ worker. force_close_all ( ) . unwrap ( ) ;
2588+ let captured = sink. drain ( ) ;
2589+ assert_eq ! (
2590+ captured. len( ) ,
2591+ 1 ,
2592+ "shutdown force-close must emit the trailing window"
2593+ ) ;
2594+ let ( output, _acc) = & captured[ 0 ] ;
2595+ assert_eq ! ( output. aggregation_id, 7 ) ;
2596+ assert_eq ! ( output. start_timestamp, 0 ) ;
2597+ assert_eq ! ( output. end_timestamp, 10_000 ) ;
2598+
2599+ // Idempotent: panes are drained, so a second force-close emits nothing.
2600+ worker. force_close_all ( ) . unwrap ( ) ;
2601+ assert_eq ! (
2602+ sink. len( ) ,
2603+ 0 ,
2604+ "force-close must be idempotent once panes are drained"
2605+ ) ;
2606+ }
24272607}
0 commit comments