diff --git a/Cargo.lock b/Cargo.lock index af98fd1c..4b5e3321 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -914,6 +914,7 @@ dependencies = [ "hostname", "iana-time-zone", "keyring", + "libc", "mail-builder", "mime_guess2", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml index 46737bc3..ff16b12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,4 +84,5 @@ inherits = "release" lto = "thin" [dev-dependencies] +libc = "0.2" serial_test = "3.4.0" diff --git a/src/helpers/gmail/watch.rs b/src/helpers/gmail/watch.rs index 027446fd..c53ec8ab 100644 --- a/src/helpers/gmail/watch.rs +++ b/src/helpers/gmail/watch.rs @@ -990,4 +990,89 @@ mod tests { assert_eq!(requests[3].1, "authorization: Bearer pubsub-token"); assert_eq!(last_history_id, 2); } + + /// Spawns a mock server that returns empty pull responses indefinitely. + /// Used by the SIGTERM test so the loop blocks in the sleep select!. + async fn spawn_empty_pull_server() -> (String, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let _ = stream.read(&mut buf).await; + let body = r#"{"receivedMessages":[]}"#; + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/json\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(resp.as_bytes()).await; + }); + } + }); + (format!("http://{addr}"), handle) + } + + #[cfg(unix)] + #[tokio::test] + #[serial_test::serial] + async fn test_watch_pull_loop_exits_on_sigterm() { + use std::time::Duration; + + let client = reqwest::Client::new(); + // Supply enough tokens for several pull iterations + let pubsub_provider = FakeTokenProvider::new(["tok"; 16]); + let gmail_provider = FakeTokenProvider::new(["tok"; 16]); + let (base, server) = spawn_empty_pull_server().await; + let sanitize_config = crate::helpers::modelarmor::SanitizeConfig { + template: None, + mode: crate::helpers::modelarmor::SanitizeMode::Warn, + }; + let runtime = WatchRuntime { + client: &client, + pubsub_token_provider: &pubsub_provider, + gmail_token_provider: &gmail_provider, + sanitize_config: &sanitize_config, + pubsub_api_base: &base, + gmail_api_base: &base, + }; + let mut last_history_id = 0u64; + let config = WatchConfig { + project: None, + subscription: None, + topic: None, + label_ids: None, + max_messages: 1, + poll_interval: 60, // Long sleep so SIGTERM fires during the sleep select! + format: "full".to_string(), + once: false, + cleanup: false, + output_dir: None, + }; + + // Send SIGTERM after the first pull completes and the loop enters the sleep select! + tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(200)).await; + unsafe { libc::raise(libc::SIGTERM) }; + }); + + let result = tokio::time::timeout( + Duration::from_secs(2), + watch_pull_loop( + &runtime, + "projects/test/subscriptions/demo", + &mut last_history_id, + config, + ), + ) + .await; + + server.abort(); + let inner = result.expect("loop should exit on SIGTERM within 2 seconds"); + assert!(inner.is_ok(), "loop should return Ok(())"); + } } diff --git a/src/helpers/mod.rs b/src/helpers/mod.rs index 48ae881b..6284f477 100644 --- a/src/helpers/mod.rs +++ b/src/helpers/mod.rs @@ -40,9 +40,10 @@ pub(crate) const PUBSUB_API_BASE: &str = "https://pubsub.googleapis.com/v1"; /// (`gmail::watch`, `events::subscribe`) to exit cleanly under container /// orchestrators (Kubernetes, Docker, systemd) that send SIGTERM. /// -/// The signal handler is registered once in a background task on first call +/// The signal handler is registered once in a background thread on first call /// so it remains active for the lifetime of the process — no gap between -/// loop iterations. +/// loop iterations, and it survives across tokio runtime boundaries (e.g. +/// in tests where each test gets its own runtime). pub(crate) async fn shutdown_signal() { use std::sync::OnceLock; use tokio::sync::Notify; @@ -52,37 +53,45 @@ pub(crate) async fn shutdown_signal() { let notify = NOTIFY.get_or_init(|| { let n = std::sync::Arc::new(Notify::new()); let n2 = n.clone(); - tokio::spawn(async move { - #[cfg(unix)] - { - use tokio::signal::unix::{signal, SignalKind}; - match signal(SignalKind::terminate()) { - Ok(mut sigterm) => { - tokio::select! { - res = tokio::signal::ctrl_c() => { - res.expect("failed to listen for SIGINT"); + // Spawn an OS thread with its own single-threaded runtime so the + // handler outlives any individual tokio runtime (important in tests). + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build shutdown-signal runtime"); + rt.block_on(async move { + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + match signal(SignalKind::terminate()) { + Ok(mut sigterm) => { + tokio::select! { + res = tokio::signal::ctrl_c() => { + res.expect("failed to listen for SIGINT"); + } + Some(_) = sigterm.recv() => {} } - Some(_) = sigterm.recv() => {} + } + Err(e) => { + eprintln!( + "warning: could not register SIGTERM handler: {e}. \ + Listening for Ctrl+C only." + ); + tokio::signal::ctrl_c() + .await + .expect("failed to listen for SIGINT"); } } - Err(e) => { - eprintln!( - "warning: could not register SIGTERM handler: {e}. \ - Listening for Ctrl+C only." - ); - tokio::signal::ctrl_c() - .await - .expect("failed to listen for SIGINT"); - } } - } - #[cfg(not(unix))] - { - tokio::signal::ctrl_c() - .await - .expect("failed to listen for SIGINT"); - } - n2.notify_waiters(); + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for SIGINT"); + } + n2.notify_waiters(); + }); }); n });