Skip to content

Commit 6807dbc

Browse files
zzylolclaude
andcommitted
test(precompute): e2e regression for one-shot NetFlow single-second ingest
Drives the real JsonFileIngestSource -> PrecomputeEngine -> sink path with a synthesized NetFlow JSONL whose records all fall in the same second. Sets wall_clock_grace_period_ms=0 so only the shutdown force-close can close the trailing window — a non-empty sink proves the force-close rescues the batch. Verified both directions: with the Shutdown force-close the single-second window is emitted with the summed bytes; with it disabled the run produces 0 outputs (reproducing the reported empty-store bug). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 31692e5 commit 6807dbc

1 file changed

Lines changed: 161 additions & 0 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! End-to-end regression test for the one-shot NetFlow ingest failure mode.
2+
//!
3+
//! Scenario (reported in the field): a one-shot JSON ingest of NetFlow records
4+
//! whose timestamps all fall within the **same second**. Every record lands in
5+
//! a single 1-second tumbling window and no later timestamp ever arrives to
6+
//! advance the watermark, so the window never closes via event-time. Before the
7+
//! shutdown force-close, the store stayed empty even after the worker shut down.
8+
//!
9+
//! This test drives the real `JsonFileIngestSource → PrecomputeEngine → sink`
10+
//! path. Crucially it sets `wall_clock_grace_period_ms = 0`, disabling the
11+
//! wall-clock fallback, so the *only* thing that can close the window is the
12+
//! shutdown force-close. A non-empty sink therefore proves the force-close is
13+
//! what rescues the one-shot batch.
14+
15+
use std::collections::HashMap;
16+
use std::io::Write;
17+
use std::sync::Arc;
18+
19+
use asap_types::aggregation_config::AggregationConfig;
20+
use asap_types::enums::{AggregationType, WindowType};
21+
22+
use query_engine_rust::data_model::StreamingConfig;
23+
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
24+
use query_engine_rust::precompute_engine::output_sink::CapturingOutputSink;
25+
use query_engine_rust::precompute_engine::{
26+
IngestSource, JsonFileIngestConfig, JsonFileIngestSource, PrecomputeEngine, TimestampUnit,
27+
};
28+
use query_engine_rust::precompute_operators::sum_accumulator::SumAccumulator;
29+
30+
fn netflow_agg_config(metric: &str, window_secs: u64) -> AggregationConfig {
31+
AggregationConfig::new(
32+
1,
33+
AggregationType::SingleSubpopulation,
34+
"Sum".to_string(),
35+
HashMap::new(),
36+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]),
37+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]),
38+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]),
39+
String::new(),
40+
window_secs,
41+
0,
42+
WindowType::Tumbling,
43+
metric.to_string(),
44+
metric.to_string(),
45+
None,
46+
None,
47+
None,
48+
None,
49+
)
50+
}
51+
52+
fn engine_config_grace_disabled() -> PrecomputeEngineConfig {
53+
PrecomputeEngineConfig {
54+
num_workers: 2,
55+
allowed_lateness_ms: 0,
56+
max_buffer_per_series: 10_000,
57+
flush_interval_ms: 100,
58+
channel_buffer_size: 10_000,
59+
pass_raw_samples: false,
60+
raw_mode_aggregation_id: 0,
61+
late_data_policy: LateDataPolicy::Drop,
62+
// Disable the wall-clock fallback: with grace=0 the trailing window can
63+
// ONLY be closed by the shutdown force-close. This isolates the fix.
64+
wall_clock_grace_period_ms: 0,
65+
}
66+
}
67+
68+
#[tokio::test]
69+
async fn netflow_single_second_batch_is_not_lost_on_shutdown() {
70+
// --- Synthesize a NetFlow JSONL where every record is in the same second.
71+
// json_ingest parses "YYYY-MM-DD HH:MM:SS" at second resolution, so all
72+
// rows collapse onto one timestamp → one window, no watermark advance.
73+
let rows = [
74+
("2024-06-01 12:00:00", "10.0.0.1", "10.0.0.2", 100.0),
75+
("2024-06-01 12:00:00", "10.0.0.3", "10.0.0.4", 250.0),
76+
("2024-06-01 12:00:00", "10.0.0.5", "10.0.0.6", 75.0),
77+
("2024-06-01 12:00:00", "10.0.0.7", "10.0.0.8", 500.0),
78+
("2024-06-01 12:00:00", "10.0.0.9", "10.0.0.10", 25.0),
79+
];
80+
let expected_total_bytes: f64 = rows.iter().map(|r| r.3).sum();
81+
82+
let path = std::env::temp_dir().join(format!(
83+
"netflow_single_second_{}.jsonl",
84+
std::process::id()
85+
));
86+
{
87+
let mut f = std::fs::File::create(&path).expect("create temp netflow file");
88+
for (ts, src, dst, bytes) in rows.iter() {
89+
writeln!(
90+
f,
91+
r#"{{"timestamp":"{ts}","src_ip":"{src}","dst_ip":"{dst}","bytes":{bytes}}}"#
92+
)
93+
.unwrap();
94+
}
95+
}
96+
97+
// --- Wire up the real engine: 1s tumbling Sum over `bytes`.
98+
let metric = "netflow_bytes";
99+
let mut agg_map = HashMap::new();
100+
agg_map.insert(1u64, netflow_agg_config(metric, 1));
101+
let streaming_config = Arc::new(StreamingConfig::new(agg_map));
102+
103+
let json_cfg = JsonFileIngestConfig {
104+
path: path.to_string_lossy().to_string(),
105+
metric_name: metric.to_string(),
106+
value_col: "bytes".to_string(),
107+
label_cols: vec![], // aggregate all flows in the second together
108+
timestamp_col: "timestamp".to_string(),
109+
timestamp_unit: TimestampUnit::Seconds,
110+
batch_size: 1024,
111+
};
112+
113+
let sink = Arc::new(CapturingOutputSink::new());
114+
let sources: Vec<Box<dyn IngestSource>> =
115+
vec![Box::new(JsonFileIngestSource::new(json_cfg))];
116+
let engine = PrecomputeEngine::new(
117+
engine_config_grace_disabled(),
118+
streaming_config,
119+
sink.clone(),
120+
sources,
121+
);
122+
123+
// The JSON source reads the file then calls broadcast_shutdown(); engine.run()
124+
// awaits the source and then the workers, so when it returns the shutdown
125+
// force-close has already emitted into the sink.
126+
engine.run().await.expect("engine run failed");
127+
128+
let _ = std::fs::remove_file(&path);
129+
130+
// --- Verify: the one-second window reached the store.
131+
let captured = sink.drain();
132+
assert_eq!(
133+
captured.len(),
134+
1,
135+
"the single-second NetFlow window must be emitted on shutdown (got {} outputs) — \
136+
before the shutdown force-close this was 0 and the store stayed empty",
137+
captured.len()
138+
);
139+
140+
let (output, acc) = &captured[0];
141+
// A 1-second tumbling window aligned to the epoch second.
142+
assert_eq!(
143+
output.end_timestamp - output.start_timestamp,
144+
1_000,
145+
"expected a 1s window, got [{}, {})",
146+
output.start_timestamp,
147+
output.end_timestamp
148+
);
149+
assert_eq!(output.start_timestamp % 1_000, 0, "window must be second-aligned");
150+
151+
let sum_acc = acc
152+
.as_any()
153+
.downcast_ref::<SumAccumulator>()
154+
.expect("NetFlow Sum aggregation should emit a SumAccumulator");
155+
assert!(
156+
(sum_acc.sum - expected_total_bytes).abs() < 1e-9,
157+
"window must hold the summed bytes of all flows in the second: expected {}, got {}",
158+
expected_total_bytes,
159+
sum_acc.sum
160+
);
161+
}

0 commit comments

Comments
 (0)