diff --git a/src/profiler.rs b/src/profiler.rs index 17e165a..983ac35 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -10,7 +10,7 @@ use crate::{ }; use std::{ fs::File, - io, mem, + io, path::{Path, PathBuf}, time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; @@ -590,6 +590,53 @@ pub(crate) trait ProfilerEngine: Send + Sync + 'static { fn stop_async_profiler() -> Result<(), asprof::AsProfError>; } +/// Holds the profiler task state and performs a final synchronous report +/// via [`Reporter::report_blocking`] when the task is cancelled (e.g. Tokio +/// runtime shutdown) before a graceful stop. +struct ProfilerTaskState { + state: ProfilerState, + reporter: Box, + agent_metadata: Option, + reporting_interval: Duration, + completed_normally: bool, +} + +impl ProfilerTaskState { + fn try_final_report(&mut self) -> Result<(), Box> { + let start = self.state.stop()?.ok_or("profiler was not running")?; + let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?; + let jfr_path = jfr_file.active_path(); + if jfr_path.metadata()?.len() == 0 { + return Ok(()); + } + let metadata = ReportMetadata { + instance: self + .agent_metadata + .as_ref() + .unwrap_or(&AgentMetadata::NoMetadata), + start: start.duration_since(UNIX_EPOCH)?, + end: SystemTime::now().duration_since(UNIX_EPOCH)?, + reporting_interval: self.reporting_interval, + }; + self.reporter + .report_blocking(&jfr_path, &metadata) + .map_err(|e| e.to_string())?; + Ok(()) + } +} + +impl Drop for ProfilerTaskState { + fn drop(&mut self) { + if self.completed_normally || !self.state.is_started() { + return; + } + tracing::info!("profiler task cancelled, attempting final report on drop"); + if let Err(err) = self.try_final_report() { + tracing::warn!(?err, "failed to report on drop"); + } + } +} + #[derive(Debug, Error)] #[non_exhaustive] enum TickError { @@ -1092,18 +1139,6 @@ impl Profiler { } fn spawn_inner(self, asprof: E) -> Result { - struct LogOnDrop; - impl Drop for LogOnDrop { - fn drop(&mut self) { - // Tokio will call destructors during runtime shutdown. Have something that will - // emit a log in that case - tracing::info!( - "unable to upload the last jfr due to Tokio runtime shutdown. \ - Add a call to `RunningProfiler::stop` to wait for jfr upload to finish." - ); - } - } - // Initialize async profiler - needs to be done once. E::init_async_profiler()?; tracing::info!("successfully initialized async profiler."); @@ -1113,7 +1148,7 @@ impl Profiler { // Get profiles at the configured interval rate. let join_handle = tokio::spawn(async move { - let mut state = match ProfilerState::new(asprof, self.profiler_options) { + let state = match ProfilerState::new(asprof, self.profiler_options) { Ok(state) => state, Err(err) => { tracing::error!(?err, "unable to create profiler state"); @@ -1121,11 +1156,15 @@ impl Profiler { } }; - // Lazily-loaded if not specified up front. - let mut agent_metadata = self.agent_metadata; - let mut done = false; + let mut task = ProfilerTaskState { + state, + reporter: self.reporter, + agent_metadata: self.agent_metadata, + reporting_interval: self.reporting_interval, + completed_normally: false, + }; - let guard = LogOnDrop; + let mut done = false; while !done { // Wait until a timer or exit event tokio::select! { @@ -1145,10 +1184,10 @@ impl Profiler { } if let Err(err) = profiler_tick( - &mut state, - &mut agent_metadata, - &*self.reporter, - self.reporting_interval, + &mut task.state, + &mut task.agent_metadata, + &*task.reporter, + task.reporting_interval, ) .await { @@ -1165,7 +1204,7 @@ impl Profiler { } } - mem::forget(guard); + task.completed_normally = true; tracing::info!("profiling task finished"); }); @@ -1575,4 +1614,93 @@ mod tests { bad => panic!("{bad:?}"), }; } + + /// A reporter that tracks both async and blocking reports separately. + struct BlockingMockReporter { + async_tx: tokio::sync::mpsc::Sender, + blocking_reports: Arc>>, + } + impl std::fmt::Debug for BlockingMockReporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockingMockReporter").finish() + } + } + + #[async_trait::async_trait] + impl Reporter for BlockingMockReporter { + async fn report( + &self, + jfr: Vec, + _metadata: &ReportMetadata, + ) -> Result<(), Box> { + self.async_tx + .send(String::from_utf8(jfr).unwrap()) + .await + .unwrap(); + Ok(()) + } + + fn report_blocking( + &self, + jfr_path: &Path, + _metadata: &ReportMetadata, + ) -> Result<(), Box> { + let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?; + self.blocking_reports + .lock() + .unwrap() + .push(String::from_utf8(jfr).unwrap()); + Ok(()) + } + } + + /// Simulates a runtime shutdown while the profiler is running. + /// The profiler should call report_blocking on drop to flush the + /// last sample. + #[test] + fn test_profiler_report_on_drop() { + let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new())); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .start_paused(true) + .build() + .unwrap(); + + let reports_clone = blocking_reports.clone(); + rt.block_on(async { + let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::(10); + let agent = ProfilerBuilder::default() + .with_reporter(BlockingMockReporter { + async_tx, + blocking_reports: reports_clone, + }) + .with_custom_agent_metadata(AgentMetadata::NoMetadata) + .build(); + // Detach so the stop channel doesn't trigger a graceful stop + // when the block_on future returns. + agent + .spawn_inner::(MockProfilerEngine { + counter: AtomicU32::new(0), + }) + .unwrap() + .detach(); + + // Wait for first async report to confirm profiler is running + let jfr = async_rx.recv().await.unwrap(); + assert_eq!(jfr, "JFR0"); + // Return without stopping — runtime drop will cancel the task. + }); + + // Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop. + drop(rt); + + let reports = blocking_reports.lock().unwrap(); + assert_eq!( + reports.len(), + 1, + "expected exactly one blocking report on drop" + ); + assert_eq!(reports[0], "JFR1"); + } } diff --git a/src/reporter/local.rs b/src/reporter/local.rs index b051a5c..2c14a0f 100644 --- a/src/reporter/local.rs +++ b/src/reporter/local.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use chrono::SecondsFormat; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::time::SystemTime; use thiserror::Error; @@ -61,18 +61,22 @@ impl LocalReporter { } } + fn jfr_file_name() -> String { + let time: chrono::DateTime = SystemTime::now().into(); + let time = time + .to_rfc3339_opts(SecondsFormat::Secs, true) + .replace(":", "-"); + format!("{time}.jfr") + } + /// Writes the jfr file to disk. async fn report_profiling_data( &self, jfr: Vec, _metadata_obj: &ReportMetadata<'_>, ) -> Result<(), std::io::Error> { - let time: chrono::DateTime = SystemTime::now().into(); - let time = time - .to_rfc3339_opts(SecondsFormat::Secs, true) - .replace(":", "-"); - tracing::debug!("reporting {time}.jfr"); - let file_name = format!("{time}.jfr"); + let file_name = Self::jfr_file_name(); + tracing::debug!("reporting {file_name}"); tokio::fs::write(self.directory.join(file_name), jfr).await?; Ok(()) } @@ -89,6 +93,18 @@ impl Reporter for LocalReporter { .await .map_err(|e| Box::new(LocalReporterError::IoError(e)) as _) } + + fn report_blocking( + &self, + jfr_path: &Path, + _metadata: &ReportMetadata, + ) -> Result<(), Box> { + let file_name = Self::jfr_file_name(); + tracing::debug!("reporting {file_name} (blocking)"); + std::fs::copy(jfr_path, self.directory.join(file_name)) + .map(|_| ()) + .map_err(|e| Box::new(LocalReporterError::IoError(e)) as _) + } } #[cfg(test)] @@ -120,4 +136,25 @@ mod test { .unwrap(); assert_eq!(tokio::fs::read(jfr_file.path()).await.unwrap(), b"JFR"); } + + #[test] + fn test_local_reporter_reports_on_drop() { + let dir = tempfile::tempdir().unwrap(); + let src = dir.path().join("input.jfr"); + std::fs::write(&src, b"JFR-DROP").unwrap(); + let out_dir = tempfile::tempdir().unwrap(); + let reporter = LocalReporter::new(out_dir.path()); + reporter.report_blocking(&src, &DUMMY_METADATA).unwrap(); + let jfr_file = std::fs::read_dir(out_dir.path()) + .unwrap() + .flat_map(|f| f.ok()) + .filter(|f| { + Path::new(&f.file_name()) + .extension() + .is_some_and(|e| e == "jfr") + }) + .next() + .unwrap(); + assert_eq!(std::fs::read(jfr_file.path()).unwrap(), b"JFR-DROP"); + } } diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 482d2ce..afc8e9e 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -9,6 +9,7 @@ //! 3. [multi::MultiReporter], which allows combining multiple reporters. use std::fmt; +use std::path::Path; use async_trait::async_trait; @@ -38,4 +39,21 @@ pub trait Reporter: fmt::Debug { jfr: Vec, metadata: &ReportMetadata, ) -> Result<(), Box>; + + /// Synchronously report profiling data. Called during drop when the + /// async runtime is shutting down and async reporting is not possible. + /// + /// The default implementation does nothing. Reporters that can perform + /// synchronous I/O (like [`local::LocalReporter`]) should override this. + fn report_blocking( + &self, + _jfr_path: &Path, + _metadata: &ReportMetadata, + ) -> Result<(), Box> { + tracing::info!( + "reporter does not support synchronous reporting, last sample will be lost. \ + Add a call to `RunningProfiler::stop` to wait for the upload to finish." + ); + Ok(()) + } } diff --git a/src/reporter/multi.rs b/src/reporter/multi.rs index 4fa1e6b..3ffbf10 100644 --- a/src/reporter/multi.rs +++ b/src/reporter/multi.rs @@ -7,6 +7,7 @@ use crate::metadata::ReportMetadata; use super::Reporter; use std::fmt; +use std::path::Path; /// An aggregated error that contains an error per reporter. A reporter is identified /// by the result of its Debug impl. @@ -109,6 +110,28 @@ impl Reporter for MultiReporter { Err(Box::new(MultiError { errors })) } } + + fn report_blocking( + &self, + jfr_path: &Path, + metadata: &ReportMetadata, + ) -> Result<(), Box> { + let errors: Vec<_> = self + .reporters + .iter() + .filter_map(|reporter| { + reporter + .report_blocking(jfr_path, metadata) + .err() + .map(|e| (format!("{reporter:?}"), e)) + }) + .collect(); + if errors.is_empty() { + Ok(()) + } else { + Err(Box::new(MultiError { errors })) + } + } } #[cfg(test)] diff --git a/tests/integration.sh b/tests/integration.sh index 8baebf4..884d8e1 100755 --- a/tests/integration.sh +++ b/tests/integration.sh @@ -12,12 +12,12 @@ dir="profiles" mkdir -p $dir rm -f $dir/*.jfr -# test the "unable to uploqad the last jfr" logic +# test the drop-path final report logic rm -rf $dir/short mkdir $dir/short ./simple --local $dir/short --duration 1s --reporting-interval 10s --no-clean-stop >$dir/short/log cat $dir/short/log -grep "unable to upload the last jfr" $dir/short/log +grep "profiler task cancelled, attempting final report on drop" $dir/short/log rm -rf $dir/short # Pass --worker-threads 16 to make the test much less flaky since there is always some worker thread running