Skip to content

Commit aef1e8f

Browse files
committed
fix: update processor test to use with_source() after Filter API change
1 parent b7c08f7 commit aef1e8f

5 files changed

Lines changed: 261 additions & 17 deletions

File tree

crates/tailflow-core/src/config.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,32 @@ pub struct FileEntry {
3636
pub label: Option<String>,
3737
}
3838

39+
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
40+
#[serde(rename_all = "kebab-case")]
41+
pub enum RestartPolicy {
42+
/// Never restart — default behaviour.
43+
#[default]
44+
Never,
45+
/// Restart after every exit, zero or non-zero.
46+
Always,
47+
/// Restart only when the process exits with a non-zero status.
48+
OnFailure,
49+
}
50+
51+
fn default_restart_delay_ms() -> u64 {
52+
1_000
53+
}
54+
3955
#[derive(Debug, Deserialize)]
4056
pub struct ProcessEntry {
4157
pub cmd: String,
4258
pub label: String,
59+
/// Restart policy on process exit. Defaults to `never`.
60+
#[serde(default)]
61+
pub restart: RestartPolicy,
62+
/// Initial restart delay in milliseconds. Doubles on each attempt, capped at 30 s.
63+
#[serde(default = "default_restart_delay_ms")]
64+
pub restart_delay_ms: u64,
4365
}
4466

4567
impl Config {
@@ -84,7 +106,9 @@ impl Config {
84106
}
85107

86108
for entry in self.sources.process {
87-
sources.push(Box::new(ProcessSource::new(entry.label, entry.cmd)));
109+
let src = ProcessSource::new(entry.label, entry.cmd)
110+
.with_restart(entry.restart, entry.restart_delay_ms);
111+
sources.push(Box::new(src));
88112
}
89113

90114
if let Some(label) = self.sources.stdin {
@@ -163,6 +187,34 @@ label = "app"
163187
assert!(cfg.sources.file[0].label.is_none());
164188
}
165189

190+
#[test]
191+
fn process_restart_defaults_to_never() {
192+
let cfg = parse("[[sources.process]]\nlabel = \"api\"\ncmd = \"go run .\"");
193+
assert_eq!(cfg.sources.process[0].restart, RestartPolicy::Never);
194+
assert_eq!(cfg.sources.process[0].restart_delay_ms, 1_000);
195+
}
196+
197+
#[test]
198+
fn process_restart_on_failure_parsed() {
199+
let cfg = parse(
200+
r#"
201+
[[sources.process]]
202+
label = "api"
203+
cmd = "go run ."
204+
restart = "on-failure"
205+
restart_delay_ms = 2000
206+
"#,
207+
);
208+
assert_eq!(cfg.sources.process[0].restart, RestartPolicy::OnFailure);
209+
assert_eq!(cfg.sources.process[0].restart_delay_ms, 2_000);
210+
}
211+
212+
#[test]
213+
fn process_restart_always_parsed() {
214+
let cfg = parse("[[sources.process]]\nlabel = \"w\"\ncmd = \"x\"\nrestart = \"always\"");
215+
assert_eq!(cfg.sources.process[0].restart, RestartPolicy::Always);
216+
}
217+
166218
#[test]
167219
fn invalid_toml_returns_error() {
168220
let result: Result<Config, _> = toml::from_str("[[[[invalid toml");

crates/tailflow-core/src/ingestion/process.rs

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use super::Source;
2-
use crate::{LogLevel, LogRecord, LogSender};
2+
use crate::{config::RestartPolicy, LogLevel, LogRecord, LogSender};
33
use anyhow::Result;
44
use chrono::Utc;
5+
use std::{process::ExitStatus, time::Duration};
56
use tokio::{
67
io::{AsyncBufReadExt, BufReader},
78
process::Command,
@@ -10,28 +11,31 @@ use tracing::{info, warn};
1011

1112
pub struct ProcessSource {
1213
label: String,
13-
/// Shell command string executed via `sh -c`
1414
cmd: String,
15+
restart_policy: RestartPolicy,
16+
/// Initial restart delay in ms; doubles each attempt, capped at 30 s.
17+
restart_delay_ms: u64,
1518
}
1619

1720
impl ProcessSource {
1821
pub fn new(label: impl Into<String>, cmd: impl Into<String>) -> Self {
1922
Self {
2023
label: label.into(),
2124
cmd: cmd.into(),
25+
restart_policy: RestartPolicy::Never,
26+
restart_delay_ms: 1_000,
2227
}
2328
}
24-
}
2529

26-
#[async_trait::async_trait]
27-
impl Source for ProcessSource {
28-
fn name(&self) -> &str {
29-
&self.label
30+
pub fn with_restart(mut self, policy: RestartPolicy, delay_ms: u64) -> Self {
31+
self.restart_policy = policy;
32+
self.restart_delay_ms = delay_ms;
33+
self
3034
}
3135

32-
async fn run(self: Box<Self>, tx: LogSender) -> Result<()> {
33-
info!(label = %self.label, cmd = %self.cmd, "spawning process");
34-
36+
/// Spawn the process once and stream its stdout/stderr into `tx`.
37+
/// Returns the process exit status.
38+
async fn run_once(&self, tx: &LogSender) -> Result<ExitStatus> {
3539
let mut child = Command::new("sh")
3640
.arg("-c")
3741
.arg(&self.cmd)
@@ -80,10 +84,73 @@ impl Source for ProcessSource {
8084
stdout_task.await.ok();
8185
stderr_task.await.ok();
8286

83-
if !status.success() {
84-
warn!(label = %self.label, code = ?status.code(), "process exited non-zero");
85-
} else {
86-
info!(label = %self.label, "process exited");
87+
Ok(status)
88+
}
89+
}
90+
91+
#[async_trait::async_trait]
92+
impl Source for ProcessSource {
93+
fn name(&self) -> &str {
94+
&self.label
95+
}
96+
97+
async fn run(self: Box<Self>, tx: LogSender) -> Result<()> {
98+
let mut attempt: u32 = 0;
99+
100+
loop {
101+
info!(label = %self.label, cmd = %self.cmd, attempt, "spawning process");
102+
103+
let status = self.run_once(&tx).await?;
104+
105+
let should_restart = match self.restart_policy {
106+
RestartPolicy::Never => false,
107+
RestartPolicy::Always => true,
108+
RestartPolicy::OnFailure => !status.success(),
109+
};
110+
111+
if !should_restart {
112+
if status.success() {
113+
info!(label = %self.label, "process exited cleanly");
114+
} else {
115+
warn!(label = %self.label, code = ?status.code(), "process exited non-zero");
116+
}
117+
break;
118+
}
119+
120+
// Stop restarting if the bus has no receivers (daemon shutting down).
121+
if tx.receiver_count() == 0 {
122+
break;
123+
}
124+
125+
// Exponential backoff: delay * 2^attempt, capped at 30 s.
126+
let delay_ms =
127+
(self.restart_delay_ms.saturating_mul(1u64 << attempt.min(5))).min(30_000);
128+
129+
let exit_desc = status
130+
.code()
131+
.map_or_else(|| "signal".to_string(), |c| c.to_string());
132+
133+
warn!(
134+
label = %self.label,
135+
exit = %exit_desc,
136+
delay_ms,
137+
attempt,
138+
"process crashed, scheduling restart"
139+
);
140+
141+
// Emit a synthetic record so the restart appears in every consumer's stream.
142+
let _ = tx.send(LogRecord {
143+
timestamp: Utc::now(),
144+
source: self.label.clone(),
145+
level: LogLevel::Warn,
146+
payload: format!(
147+
"[tailflow] process exited ({exit_desc}), restarting in {delay_ms} ms \
148+
(attempt {attempt})"
149+
),
150+
});
151+
152+
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
153+
attempt += 1;
87154
}
88155

89156
Ok(())

crates/tailflow-core/src/processor/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ pub struct Filter {
1111

1212
impl Filter {
1313
pub fn none() -> Self {
14-
Self { grep: None, source: None }
14+
Self {
15+
grep: None,
16+
source: None,
17+
}
1518
}
1619

1720
/// Build a filter that matches records whose payload matches `pattern`.
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use std::time::{Duration, Instant};
2+
use tailflow_core::{
3+
config::RestartPolicy,
4+
ingestion::{process::ProcessSource, Source},
5+
new_bus,
6+
};
7+
8+
/// Collect up to `limit` records from `rx` within `timeout`.
9+
async fn collect(
10+
mut rx: tailflow_core::LogReceiver,
11+
limit: usize,
12+
timeout: Duration,
13+
) -> Vec<String> {
14+
let deadline = Instant::now() + timeout;
15+
let mut payloads = Vec::new();
16+
while payloads.len() < limit && Instant::now() < deadline {
17+
let remaining = deadline.saturating_duration_since(Instant::now());
18+
match tokio::time::timeout(remaining, rx.recv()).await {
19+
Ok(Ok(r)) => payloads.push(r.payload),
20+
_ => break,
21+
}
22+
}
23+
payloads
24+
}
25+
26+
// ── RestartPolicy::Never ──────────────────────────────────────────────────────
27+
28+
#[tokio::test]
29+
async fn never_policy_does_not_restart_on_failure() {
30+
let (tx, rx) = new_bus();
31+
let src =
32+
ProcessSource::new("test", "echo hello && exit 1").with_restart(RestartPolicy::Never, 50);
33+
34+
tokio::spawn(async move { Box::new(src).run(tx).await });
35+
36+
let payloads = collect(rx, 10, Duration::from_secs(3)).await;
37+
// Should get exactly one "hello", no restart synthetic record.
38+
assert_eq!(payloads.iter().filter(|p| p.as_str() == "hello").count(), 1);
39+
assert!(
40+
!payloads.iter().any(|p| p.contains("[tailflow]")),
41+
"unexpected restart record with Never policy"
42+
);
43+
}
44+
45+
// ── RestartPolicy::OnFailure ──────────────────────────────────────────────────
46+
47+
#[tokio::test]
48+
async fn on_failure_restarts_after_non_zero_exit() {
49+
let (tx, rx) = new_bus();
50+
// Exits non-zero — should trigger a restart synthetic record.
51+
let src = ProcessSource::new("test", "exit 1").with_restart(RestartPolicy::OnFailure, 50);
52+
53+
tokio::spawn(async move { Box::new(src).run(tx).await });
54+
55+
let payloads = collect(rx, 5, Duration::from_secs(3)).await;
56+
assert!(
57+
payloads
58+
.iter()
59+
.any(|p| p.contains("[tailflow]") && p.contains("restarting")),
60+
"expected restart record after non-zero exit, got: {payloads:?}"
61+
);
62+
}
63+
64+
#[tokio::test]
65+
async fn on_failure_does_not_restart_after_clean_exit() {
66+
let (tx, rx) = new_bus();
67+
// Exits zero — should NOT restart.
68+
let src = ProcessSource::new("test", "echo done").with_restart(RestartPolicy::OnFailure, 50);
69+
70+
tokio::spawn(async move { Box::new(src).run(tx).await });
71+
72+
let payloads = collect(rx, 10, Duration::from_secs(2)).await;
73+
assert!(
74+
!payloads.iter().any(|p| p.contains("[tailflow]")),
75+
"unexpected restart record after clean exit"
76+
);
77+
}
78+
79+
// ── RestartPolicy::Always ─────────────────────────────────────────────────────
80+
81+
#[tokio::test]
82+
async fn always_policy_restarts_after_clean_exit() {
83+
let (tx, rx) = new_bus();
84+
// Exits zero — Always policy should still restart.
85+
let src = ProcessSource::new("test", "echo hi").with_restart(RestartPolicy::Always, 50);
86+
87+
tokio::spawn(async move { Box::new(src).run(tx).await });
88+
89+
let payloads = collect(rx, 10, Duration::from_secs(3)).await;
90+
assert!(
91+
payloads
92+
.iter()
93+
.any(|p| p.contains("[tailflow]") && p.contains("restarting")),
94+
"expected restart record with Always policy, got: {payloads:?}"
95+
);
96+
}
97+
98+
// ── Backoff ───────────────────────────────────────────────────────────────────
99+
100+
#[tokio::test]
101+
async fn restart_delay_is_respected() {
102+
let (tx, rx) = new_bus();
103+
// 200 ms initial delay so we can measure it.
104+
let src = ProcessSource::new("test", "exit 1").with_restart(RestartPolicy::OnFailure, 200);
105+
106+
tokio::spawn(async move { Box::new(src).run(tx).await });
107+
108+
let start = Instant::now();
109+
// Wait for the first restart synthetic record.
110+
let payloads = collect(rx, 2, Duration::from_secs(3)).await;
111+
let elapsed = start.elapsed();
112+
113+
assert!(
114+
payloads.iter().any(|p| p.contains("[tailflow]")),
115+
"expected restart record"
116+
);
117+
// At least the configured delay elapsed before the record appeared.
118+
assert!(
119+
elapsed >= Duration::from_millis(150),
120+
"restart fired too fast ({elapsed:?})"
121+
);
122+
}

crates/tailflow-core/tests/processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async fn filter_none_passes_all_records() {
8181
#[tokio::test]
8282
async fn filtered_bus_matches_on_source_name() {
8383
let (tx, rx) = new_bus();
84-
let mut out = filtered_bus(rx, Filter::regex("^frontend$").unwrap());
84+
let mut out = filtered_bus(rx, Filter::none().with_source("frontend"));
8585

8686
tx.send(record("backend", "request handled")).unwrap();
8787
tx.send(record("frontend", "compiled in 1.2s")).unwrap();

0 commit comments

Comments
 (0)