Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ inherits = "release"
lto = "thin"

[dev-dependencies]
libc = "0.2"
serial_test = "3.4.0"
85 changes: 85 additions & 0 deletions src/helpers/gmail/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,89 @@ mod tests {
assert_eq!(requests[3].1, "authorization: Bearer pubsub-token");
assert_eq!(last_history_id, 2);
}

/// Spawns a mock server that returns empty pull responses indefinitely.
/// Used by the SIGTERM test so the loop blocks in the sleep select!.
async fn spawn_empty_pull_server() -> (String, tokio::task::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
loop {
let Ok((mut stream, _)) = listener.accept().await else {
break;
};
tokio::spawn(async move {
let mut buf = [0u8; 4096];
let _ = stream.read(&mut buf).await;
let body = r#"{"receivedMessages":[]}"#;
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/json\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(resp.as_bytes()).await;
});
}
});
(format!("http://{addr}"), handle)
}

#[cfg(unix)]
#[tokio::test]
#[serial_test::serial]
async fn test_watch_pull_loop_exits_on_sigterm() {
use std::time::Duration;

let client = reqwest::Client::new();
// Supply enough tokens for several pull iterations
let pubsub_provider = FakeTokenProvider::new(["tok"; 16]);
let gmail_provider = FakeTokenProvider::new(["tok"; 16]);
let (base, server) = spawn_empty_pull_server().await;
let sanitize_config = crate::helpers::modelarmor::SanitizeConfig {
template: None,
mode: crate::helpers::modelarmor::SanitizeMode::Warn,
};
let runtime = WatchRuntime {
client: &client,
pubsub_token_provider: &pubsub_provider,
gmail_token_provider: &gmail_provider,
sanitize_config: &sanitize_config,
pubsub_api_base: &base,
gmail_api_base: &base,
};
let mut last_history_id = 0u64;
let config = WatchConfig {
project: None,
subscription: None,
topic: None,
label_ids: None,
max_messages: 1,
poll_interval: 60, // Long sleep so SIGTERM fires during the sleep select!
format: "full".to_string(),
once: false,
cleanup: false,
output_dir: None,
};

// Send SIGTERM after the first pull completes and the loop enters the sleep select!
tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(200)).await;
unsafe { libc::raise(libc::SIGTERM) };
});

let result = tokio::time::timeout(
Duration::from_secs(2),
watch_pull_loop(
&runtime,
"projects/test/subscriptions/demo",
&mut last_history_id,
config,
),
)
.await;

server.abort();
let inner = result.expect("loop should exit on SIGTERM within 2 seconds");
assert!(inner.is_ok(), "loop should return Ok(())");
}
}
67 changes: 38 additions & 29 deletions src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ pub(crate) const PUBSUB_API_BASE: &str = "https://pubsub.googleapis.com/v1";
/// (`gmail::watch`, `events::subscribe`) to exit cleanly under container
/// orchestrators (Kubernetes, Docker, systemd) that send SIGTERM.
///
/// The signal handler is registered once in a background task on first call
/// The signal handler is registered once in a background thread on first call
/// so it remains active for the lifetime of the process — no gap between
/// loop iterations.
/// loop iterations, and it survives across tokio runtime boundaries (e.g.
/// in tests where each test gets its own runtime).
pub(crate) async fn shutdown_signal() {
use std::sync::OnceLock;
use tokio::sync::Notify;
Expand All @@ -52,37 +53,45 @@ pub(crate) async fn shutdown_signal() {
let notify = NOTIFY.get_or_init(|| {
let n = std::sync::Arc::new(Notify::new());
let n2 = n.clone();
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut sigterm) => {
tokio::select! {
res = tokio::signal::ctrl_c() => {
res.expect("failed to listen for SIGINT");
// Spawn an OS thread with its own single-threaded runtime so the
// handler outlives any individual tokio runtime (important in tests).
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build shutdown-signal runtime");
rt.block_on(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut sigterm) => {
tokio::select! {
res = tokio::signal::ctrl_c() => {
res.expect("failed to listen for SIGINT");
}
Some(_) = sigterm.recv() => {}
}
Some(_) = sigterm.recv() => {}
}
Err(e) => {
eprintln!(
"warning: could not register SIGTERM handler: {e}. \
Listening for Ctrl+C only."
);
tokio::signal::ctrl_c()
.await
.expect("failed to listen for SIGINT");
}
}
Err(e) => {
eprintln!(
"warning: could not register SIGTERM handler: {e}. \
Listening for Ctrl+C only."
);
tokio::signal::ctrl_c()
.await
.expect("failed to listen for SIGINT");
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("failed to listen for SIGINT");
}
n2.notify_waiters();
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("failed to listen for SIGINT");
}
n2.notify_waiters();
});
});
n
});
Expand Down
Loading