Skip to content

Commit cd6748d

Browse files
committed
fix: ring buffer O(n)→VecDeque, TUI layout guard, mutex poison recovery, cache filter regex, log task panics, SSE serialize error
1 parent 0b7b830 commit cd6748d

5 files changed

Lines changed: 35 additions & 20 deletions

File tree

crates/tailflow-core/src/ingestion/process.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,12 @@ impl ProcessSource {
8181
});
8282

8383
let status = child.wait().await?;
84-
stdout_task.await.ok();
85-
stderr_task.await.ok();
84+
if let Err(e) = stdout_task.await {
85+
tracing::warn!(label = %self.label, err = ?e, "stdout reader task panicked");
86+
}
87+
if let Err(e) = stderr_task.await {
88+
tracing::warn!(label = %self.label, err = ?e, "stderr reader task panicked");
89+
}
8690

8791
Ok(status)
8892
}

crates/tailflow-daemon/src/routes.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,13 @@ async fn sse_handler(
7575
let filter = params.into_filter();
7676
let rx = state.tx.subscribe();
7777
let stream = BroadcastStream::new(rx).filter_map(move |res| match res {
78-
Ok(record) if filter.matches(&record) => {
79-
let data = serde_json::to_string(&record).unwrap_or_default();
80-
Some(Ok(Event::default().data(data)))
81-
}
78+
Ok(record) if filter.matches(&record) => match serde_json::to_string(&record) {
79+
Ok(data) => Some(Ok(Event::default().data(data))),
80+
Err(e) => {
81+
tracing::error!(err = %e, "failed to serialize log record for SSE");
82+
None
83+
}
84+
},
8285
Ok(_) => None,
8386
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
8487
tracing::warn!(dropped = n, "SSE client lagged");
@@ -99,7 +102,7 @@ async fn records_handler(
99102
let records: Vec<_> = state
100103
.ring
101104
.lock()
102-
.unwrap()
105+
.unwrap_or_else(|p| p.into_inner()) // recover from poisoned mutex
103106
.iter()
104107
.filter(|r| filter.matches(r))
105108
.cloned()

crates/tailflow-daemon/src/state.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::sync::{Arc, Mutex};
1+
use std::{
2+
collections::VecDeque,
3+
sync::{Arc, Mutex},
4+
};
25
use tailflow_core::{LogReceiver, LogRecord, LogSender};
36
use tokio::sync::broadcast;
47

@@ -9,7 +12,7 @@ pub struct AppState {
912
/// Subscribe to the live stream by calling `tx.subscribe()`.
1013
pub tx: LogSender,
1114
/// Rolling buffer of the last RING_SIZE records (for `/api/records`).
12-
pub ring: Mutex<Vec<LogRecord>>,
15+
pub ring: Mutex<VecDeque<LogRecord>>,
1316
}
1417

1518
impl AppState {
@@ -18,7 +21,7 @@ impl AppState {
1821
let (tx, _) = broadcast::channel(tailflow_core::BUS_CAPACITY);
1922
let state = Arc::new(AppState {
2023
tx: tx.clone(),
21-
ring: Mutex::new(Vec::with_capacity(RING_SIZE)),
24+
ring: Mutex::new(VecDeque::with_capacity(RING_SIZE)),
2225
});
2326
let state2 = state.clone();
2427

@@ -27,11 +30,11 @@ impl AppState {
2730
match source_rx.recv().await {
2831
Ok(record) => {
2932
{
30-
let mut buf = state2.ring.lock().unwrap();
33+
let mut buf = state2.ring.lock().unwrap_or_else(|p| p.into_inner());
3134
if buf.len() >= RING_SIZE {
32-
buf.remove(0);
35+
buf.pop_front(); // O(1) vs Vec::remove(0) O(n)
3336
}
34-
buf.push(record.clone());
37+
buf.push_back(record.clone());
3538
}
3639
let _ = tx.send(record);
3740
}

crates/tailflow-tui/src/app.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crossterm::{
66
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
77
};
88
use ratatui::{backend::CrosstermBackend, Terminal};
9+
use regex::Regex;
910
use std::{io, time::Duration};
1011
use tailflow_core::{LogReceiver, LogRecord};
1112
use tokio::sync::broadcast;
@@ -15,6 +16,8 @@ const MAX_RECORDS: usize = 2000;
1516
pub struct App {
1617
pub records: Vec<LogRecord>,
1718
pub filter: String,
19+
/// Compiled regex for `filter`; kept in sync whenever `filter` changes.
20+
pub filter_re: Option<Regex>,
1821
pub filter_mode: bool,
1922
rx: LogReceiver,
2023
pub scroll: usize,
@@ -66,6 +69,7 @@ impl App {
6669
Self {
6770
records: Vec::with_capacity(MAX_RECORDS),
6871
filter: String::new(),
72+
filter_re: None,
6973
filter_mode: false,
7074
rx,
7175
scroll: 0,
@@ -159,9 +163,11 @@ impl App {
159163
}
160164
(true, KeyCode::Backspace) => {
161165
self.filter.pop();
166+
self.filter_re = Regex::new(&self.filter).ok();
162167
}
163168
(true, KeyCode::Char(c)) => {
164169
self.filter.push(c);
170+
self.filter_re = Regex::new(&self.filter).ok();
165171
}
166172

167173
_ => {}

crates/tailflow-tui/src/ui/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,20 @@ pub fn draw(f: &mut Frame, app: &mut App) {
2020
])
2121
.split(area);
2222

23+
// Guard against degenerate terminal sizes
24+
if chunks.len() < 3 {
25+
return;
26+
}
2327
let list_height = chunks[1].height as usize;
2428

25-
// ── Build filter predicate ─────────────────────────────────────────────
26-
let filter_re = if !app.filter.is_empty() {
27-
regex::Regex::new(&app.filter).ok()
28-
} else {
29-
None
30-
};
29+
// ── Build filter predicate (uses pre-compiled regex from App) ──────────
3130
let filter_lower = app.filter.to_lowercase();
3231

3332
let matches = |payload: &str, source: &str| -> bool {
3433
if app.filter.is_empty() {
3534
return true;
3635
}
37-
if let Some(re) = &filter_re {
36+
if let Some(re) = &app.filter_re {
3837
re.is_match(payload) || re.is_match(source)
3938
} else {
4039
payload.to_lowercase().contains(&filter_lower)

0 commit comments

Comments
 (0)