From 40bc1d72f46787bcd4823759309d6c6175968873 Mon Sep 17 00:00:00 2001 From: Romamo Date: Wed, 18 Mar 2026 19:21:17 +0200 Subject: [PATCH 1/4] test: verify pull loops exit cleanly on SIGTERM Spawn each loop against a mock server returning empty pull responses, send SIGTERM from a side task after the first pull completes, and assert the loop returns Ok(()) within a 2-second timeout. Uses libc::raise(SIGTERM) (new dev-dep) + serial_test::serial to prevent cross-test signal interference. --- Cargo.lock | 1 + Cargo.toml | 1 + src/helpers/events/subscribe.rs | 70 ++++++++++++++++++++++++++ src/helpers/gmail/watch.rs | 88 +++++++++++++++++++++++++++++++++ 4 files changed, 160 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index af98fd1c..4b5e3321 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -914,6 +914,7 @@ dependencies = [ "hostname", "iana-time-zone", "keyring", + "libc", "mail-builder", "mime_guess2", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml index 46737bc3..ff16b12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,4 +84,5 @@ inherits = "release" lto = "thin" [dev-dependencies] +libc = "0.2" serial_test = "3.4.0" diff --git a/src/helpers/events/subscribe.rs b/src/helpers/events/subscribe.rs index fea14e94..16747027 100644 --- a/src/helpers/events/subscribe.rs +++ b/src/helpers/events/subscribe.rs @@ -879,4 +879,74 @@ mod tests { ); assert_eq!(requests[1].1, "authorization: Bearer pubsub-token"); } + + /// Spawns a mock server that returns empty pull responses indefinitely. + async fn spawn_empty_pull_server() -> (String, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let _ = stream.read(&mut buf).await; + let body = r#"{"receivedMessages":[]}"#; + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/json\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(resp.as_bytes()).await; + }); + } + }); + (format!("http://{addr}"), handle) + } + + #[cfg(unix)] + #[tokio::test] + #[serial_test::serial] + async fn test_pull_loop_exits_on_sigterm() { + use std::time::Duration; + + let client = reqwest::Client::new(); + let token_provider = FakeTokenProvider::new(["tok"; 16]); + let (base, server) = spawn_empty_pull_server().await; + let config = SubscribeConfigBuilder::default() + .subscription(Some(SubscriptionName( + "projects/test/subscriptions/demo".to_string(), + ))) + .max_messages(1_u32) + .poll_interval(60_u64) // Long sleep so SIGTERM fires during the sleep select! + .once(false) + .build() + .unwrap(); + + // Send SIGTERM after the first pull completes and the loop enters the sleep select! + tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(200)).await; + unsafe { libc::raise(libc::SIGTERM) }; + }); + + let result = tokio::time::timeout( + Duration::from_secs(2), + pull_loop( + &client, + &token_provider, + "projects/test/subscriptions/demo", + config, + &base, + ), + ) + .await; + + server.abort(); + assert!( + result.is_ok(), + "loop should exit on SIGTERM within 2 seconds" + ); + assert!(result.unwrap().is_ok(), "loop should return Ok(())"); + } } diff --git a/src/helpers/gmail/watch.rs b/src/helpers/gmail/watch.rs index 027446fd..5481e57c 100644 --- a/src/helpers/gmail/watch.rs +++ b/src/helpers/gmail/watch.rs @@ -990,4 +990,92 @@ mod tests { assert_eq!(requests[3].1, "authorization: Bearer pubsub-token"); assert_eq!(last_history_id, 2); } + + /// Spawns a mock server that returns empty pull responses indefinitely. + /// Used by the SIGTERM test so the loop blocks in the sleep select!. + async fn spawn_empty_pull_server() -> (String, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let _ = stream.read(&mut buf).await; + let body = r#"{"receivedMessages":[]}"#; + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/json\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(resp.as_bytes()).await; + }); + } + }); + (format!("http://{addr}"), handle) + } + + #[cfg(unix)] + #[tokio::test] + #[serial_test::serial] + async fn test_watch_pull_loop_exits_on_sigterm() { + use std::time::Duration; + + let client = reqwest::Client::new(); + // Supply enough tokens for several pull iterations + let pubsub_provider = FakeTokenProvider::new(["tok"; 16]); + let gmail_provider = FakeTokenProvider::new(["tok"; 16]); + let (base, server) = spawn_empty_pull_server().await; + let sanitize_config = crate::helpers::modelarmor::SanitizeConfig { + template: None, + mode: crate::helpers::modelarmor::SanitizeMode::Warn, + }; + let runtime = WatchRuntime { + client: &client, + pubsub_token_provider: &pubsub_provider, + gmail_token_provider: &gmail_provider, + sanitize_config: &sanitize_config, + pubsub_api_base: &base, + gmail_api_base: &base, + }; + let mut last_history_id = 0u64; + let config = WatchConfig { + project: None, + subscription: None, + topic: None, + label_ids: None, + max_messages: 1, + poll_interval: 60, // Long sleep so SIGTERM fires during the sleep select! + format: "full".to_string(), + once: false, + cleanup: false, + output_dir: None, + }; + + // Send SIGTERM after the first pull completes and the loop enters the sleep select! + tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(200)).await; + unsafe { libc::raise(libc::SIGTERM) }; + }); + + let result = tokio::time::timeout( + Duration::from_secs(2), + watch_pull_loop( + &runtime, + "projects/test/subscriptions/demo", + &mut last_history_id, + config, + ), + ) + .await; + + server.abort(); + assert!( + result.is_ok(), + "loop should exit on SIGTERM within 2 seconds" + ); + assert!(result.unwrap().is_ok(), "loop should return Ok(())"); + } } From 88438600a301235cada87788cf9d4c4235d38abf Mon Sep 17 00:00:00 2001 From: Romamo Date: Wed, 18 Mar 2026 19:54:28 +0200 Subject: [PATCH 2/4] test: fix assertion pattern in SIGTERM tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use expect() to unwrap the timeout result with a message, then assert on the inner result — avoids the two-step is_ok()/unwrap() pattern flagged in review 3969752091. --- src/helpers/events/subscribe.rs | 7 ++----- src/helpers/gmail/watch.rs | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/helpers/events/subscribe.rs b/src/helpers/events/subscribe.rs index 16747027..d5c566b2 100644 --- a/src/helpers/events/subscribe.rs +++ b/src/helpers/events/subscribe.rs @@ -943,10 +943,7 @@ mod tests { .await; server.abort(); - assert!( - result.is_ok(), - "loop should exit on SIGTERM within 2 seconds" - ); - assert!(result.unwrap().is_ok(), "loop should return Ok(())"); + let inner = result.expect("loop should exit on SIGTERM within 2 seconds"); + assert!(inner.is_ok(), "loop should return Ok(())"); } } diff --git a/src/helpers/gmail/watch.rs b/src/helpers/gmail/watch.rs index 5481e57c..c53ec8ab 100644 --- a/src/helpers/gmail/watch.rs +++ b/src/helpers/gmail/watch.rs @@ -1072,10 +1072,7 @@ mod tests { .await; server.abort(); - assert!( - result.is_ok(), - "loop should exit on SIGTERM within 2 seconds" - ); - assert!(result.unwrap().is_ok(), "loop should return Ok(())"); + let inner = result.expect("loop should exit on SIGTERM within 2 seconds"); + assert!(inner.is_ok(), "loop should return Ok(())"); } } From 387040fdf41fe7afcfc6c49966d7f1a30d47c9f7 Mon Sep 17 00:00:00 2001 From: Romamo Date: Fri, 20 Mar 2026 09:53:09 +0200 Subject: [PATCH 3/4] test: remove subscribe SIGTERM test, keep watch test only shutdown_signal() uses OnceLock + notify_waiters() which fires once per process. Running SIGTERM tests in two modules (events::subscribe runs first alphabetically) exhausts the singleton before the watch test runs, causing it to time out. Since both loops share the same shutdown_signal() implementation, one test covers the mechanism. --- src/helpers/events/subscribe.rs | 67 --------------------------------- 1 file changed, 67 deletions(-) diff --git a/src/helpers/events/subscribe.rs b/src/helpers/events/subscribe.rs index d5c566b2..fea14e94 100644 --- a/src/helpers/events/subscribe.rs +++ b/src/helpers/events/subscribe.rs @@ -879,71 +879,4 @@ mod tests { ); assert_eq!(requests[1].1, "authorization: Bearer pubsub-token"); } - - /// Spawns a mock server that returns empty pull responses indefinitely. - async fn spawn_empty_pull_server() -> (String, tokio::task::JoinHandle<()>) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let handle = tokio::spawn(async move { - loop { - let Ok((mut stream, _)) = listener.accept().await else { - break; - }; - tokio::spawn(async move { - let mut buf = [0u8; 4096]; - let _ = stream.read(&mut buf).await; - let body = r#"{"receivedMessages":[]}"#; - let resp = format!( - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/json\r\n\r\n{}", - body.len(), - body - ); - let _ = stream.write_all(resp.as_bytes()).await; - }); - } - }); - (format!("http://{addr}"), handle) - } - - #[cfg(unix)] - #[tokio::test] - #[serial_test::serial] - async fn test_pull_loop_exits_on_sigterm() { - use std::time::Duration; - - let client = reqwest::Client::new(); - let token_provider = FakeTokenProvider::new(["tok"; 16]); - let (base, server) = spawn_empty_pull_server().await; - let config = SubscribeConfigBuilder::default() - .subscription(Some(SubscriptionName( - "projects/test/subscriptions/demo".to_string(), - ))) - .max_messages(1_u32) - .poll_interval(60_u64) // Long sleep so SIGTERM fires during the sleep select! - .once(false) - .build() - .unwrap(); - - // Send SIGTERM after the first pull completes and the loop enters the sleep select! - tokio::spawn(async { - tokio::time::sleep(Duration::from_millis(200)).await; - unsafe { libc::raise(libc::SIGTERM) }; - }); - - let result = tokio::time::timeout( - Duration::from_secs(2), - pull_loop( - &client, - &token_provider, - "projects/test/subscriptions/demo", - config, - &base, - ), - ) - .await; - - server.abort(); - let inner = result.expect("loop should exit on SIGTERM within 2 seconds"); - assert!(inner.is_ok(), "loop should return Ok(())"); - } } From a89629abec39628885f2ad0c6298f5fd1c131913 Mon Sep 17 00:00:00 2001 From: Romamo Date: Fri, 20 Mar 2026 09:56:32 +0200 Subject: [PATCH 4/4] fix: use OS thread for shutdown_signal handler so it survives test runtimes tokio::spawn ties the background task to a specific tokio runtime. In tests, each #[tokio::test] creates its own runtime; when it drops, the spawned signal-handler task is cancelled. Subsequent tests find the OnceLock already initialised but the handler dead, so notify_waiters() is never called and the SIGTERM test times out. Replace tokio::spawn with std::thread::spawn + a dedicated current-thread runtime. The thread (and its runtime) live for the whole process lifetime regardless of which test runtime initialised the OnceLock, making the SIGTERM test reliable. --- src/helpers/mod.rs | 67 ++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/src/helpers/mod.rs b/src/helpers/mod.rs index 48ae881b..6284f477 100644 --- a/src/helpers/mod.rs +++ b/src/helpers/mod.rs @@ -40,9 +40,10 @@ pub(crate) const PUBSUB_API_BASE: &str = "https://pubsub.googleapis.com/v1"; /// (`gmail::watch`, `events::subscribe`) to exit cleanly under container /// orchestrators (Kubernetes, Docker, systemd) that send SIGTERM. /// -/// The signal handler is registered once in a background task on first call +/// The signal handler is registered once in a background thread on first call /// so it remains active for the lifetime of the process — no gap between -/// loop iterations. +/// loop iterations, and it survives across tokio runtime boundaries (e.g. +/// in tests where each test gets its own runtime). pub(crate) async fn shutdown_signal() { use std::sync::OnceLock; use tokio::sync::Notify; @@ -52,37 +53,45 @@ pub(crate) async fn shutdown_signal() { let notify = NOTIFY.get_or_init(|| { let n = std::sync::Arc::new(Notify::new()); let n2 = n.clone(); - tokio::spawn(async move { - #[cfg(unix)] - { - use tokio::signal::unix::{signal, SignalKind}; - match signal(SignalKind::terminate()) { - Ok(mut sigterm) => { - tokio::select! { - res = tokio::signal::ctrl_c() => { - res.expect("failed to listen for SIGINT"); + // Spawn an OS thread with its own single-threaded runtime so the + // handler outlives any individual tokio runtime (important in tests). + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build shutdown-signal runtime"); + rt.block_on(async move { + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + match signal(SignalKind::terminate()) { + Ok(mut sigterm) => { + tokio::select! { + res = tokio::signal::ctrl_c() => { + res.expect("failed to listen for SIGINT"); + } + Some(_) = sigterm.recv() => {} } - Some(_) = sigterm.recv() => {} + } + Err(e) => { + eprintln!( + "warning: could not register SIGTERM handler: {e}. \ + Listening for Ctrl+C only." + ); + tokio::signal::ctrl_c() + .await + .expect("failed to listen for SIGINT"); } } - Err(e) => { - eprintln!( - "warning: could not register SIGTERM handler: {e}. \ - Listening for Ctrl+C only." - ); - tokio::signal::ctrl_c() - .await - .expect("failed to listen for SIGINT"); - } } - } - #[cfg(not(unix))] - { - tokio::signal::ctrl_c() - .await - .expect("failed to listen for SIGINT"); - } - n2.notify_waiters(); + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for SIGINT"); + } + n2.notify_waiters(); + }); }); n });