Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 151 additions & 23 deletions src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use std::{
fs::File,
io, mem,
io,
path::{Path, PathBuf},
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};
Expand Down Expand Up @@ -590,6 +590,53 @@ pub(crate) trait ProfilerEngine: Send + Sync + 'static {
fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
}

/// Holds the profiler task state and performs a final synchronous report
/// via [`Reporter::report_blocking`] when the task is cancelled (e.g. Tokio
/// runtime shutdown) before a graceful stop.
struct ProfilerTaskState<E: ProfilerEngine> {
state: ProfilerState<E>,
reporter: Box<dyn Reporter + Send + Sync>,
agent_metadata: Option<AgentMetadata>,
reporting_interval: Duration,
completed_normally: bool,
}

impl<E: ProfilerEngine> ProfilerTaskState<E> {
fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let start = self.state.stop()?.ok_or("profiler was not running")?;
let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?;
let jfr_path = jfr_file.active_path();
if jfr_path.metadata()?.len() == 0 {
return Ok(());
}
let metadata = ReportMetadata {
instance: self
.agent_metadata
.as_ref()
.unwrap_or(&AgentMetadata::NoMetadata),
start: start.duration_since(UNIX_EPOCH)?,
end: SystemTime::now().duration_since(UNIX_EPOCH)?,
reporting_interval: self.reporting_interval,
};
self.reporter
.report_blocking(&jfr_path, &metadata)
.map_err(|e| e.to_string())?;
Ok(())
}
}

impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
fn drop(&mut self) {
if self.completed_normally || !self.state.is_started() {
return;
}
tracing::info!("profiler task cancelled, attempting final report on drop");
if let Err(err) = self.try_final_report() {
tracing::warn!(?err, "failed to report on drop");
}
}
}

#[derive(Debug, Error)]
#[non_exhaustive]
enum TickError {
Expand Down Expand Up @@ -1092,18 +1139,6 @@ impl Profiler {
}

fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
struct LogOnDrop;
impl Drop for LogOnDrop {
fn drop(&mut self) {
// Tokio will call destructors during runtime shutdown. Have something that will
// emit a log in that case
tracing::info!(
"unable to upload the last jfr due to Tokio runtime shutdown. \
Add a call to `RunningProfiler::stop` to wait for jfr upload to finish."
);
}
}

// Initialize async profiler - needs to be done once.
E::init_async_profiler()?;
tracing::info!("successfully initialized async profiler.");
Expand All @@ -1113,19 +1148,23 @@ impl Profiler {

// Get profiles at the configured interval rate.
let join_handle = tokio::spawn(async move {
let mut state = match ProfilerState::new(asprof, self.profiler_options) {
let state = match ProfilerState::new(asprof, self.profiler_options) {
Ok(state) => state,
Err(err) => {
tracing::error!(?err, "unable to create profiler state");
return;
}
};

// Lazily-loaded if not specified up front.
let mut agent_metadata = self.agent_metadata;
let mut done = false;
let mut task = ProfilerTaskState {
state,
reporter: self.reporter,
agent_metadata: self.agent_metadata,
reporting_interval: self.reporting_interval,
completed_normally: false,
};

let guard = LogOnDrop;
let mut done = false;
while !done {
// Wait until a timer or exit event
tokio::select! {
Expand All @@ -1145,10 +1184,10 @@ impl Profiler {
}

if let Err(err) = profiler_tick(
&mut state,
&mut agent_metadata,
&*self.reporter,
self.reporting_interval,
&mut task.state,
&mut task.agent_metadata,
&*task.reporter,
task.reporting_interval,
)
.await
{
Expand All @@ -1165,7 +1204,7 @@ impl Profiler {
}
}

mem::forget(guard);
task.completed_normally = true;
tracing::info!("profiling task finished");
});

Expand Down Expand Up @@ -1575,4 +1614,93 @@ mod tests {
bad => panic!("{bad:?}"),
};
}

/// A reporter that tracks both async and blocking reports separately.
struct BlockingMockReporter {
async_tx: tokio::sync::mpsc::Sender<String>,
blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
}
impl std::fmt::Debug for BlockingMockReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockingMockReporter").finish()
}
}

#[async_trait::async_trait]
impl Reporter for BlockingMockReporter {
async fn report(
&self,
jfr: Vec<u8>,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
self.async_tx
.send(String::from_utf8(jfr).unwrap())
.await
.unwrap();
Ok(())
}

fn report_blocking(
&self,
jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
self.blocking_reports
.lock()
.unwrap()
.push(String::from_utf8(jfr).unwrap());
Ok(())
}
}

/// Simulates a runtime shutdown while the profiler is running.
/// The profiler should call report_blocking on drop to flush the
/// last sample.
#[test]
fn test_profiler_report_on_drop() {
let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.start_paused(true)
.build()
.unwrap();

let reports_clone = blocking_reports.clone();
rt.block_on(async {
let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
let agent = ProfilerBuilder::default()
.with_reporter(BlockingMockReporter {
async_tx,
blocking_reports: reports_clone,
})
.with_custom_agent_metadata(AgentMetadata::NoMetadata)
.build();
// Detach so the stop channel doesn't trigger a graceful stop
// when the block_on future returns.
agent
.spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
counter: AtomicU32::new(0),
})
.unwrap()
.detach();

// Wait for first async report to confirm profiler is running
let jfr = async_rx.recv().await.unwrap();
assert_eq!(jfr, "JFR0");
// Return without stopping — runtime drop will cancel the task.
});

// Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
drop(rt);

let reports = blocking_reports.lock().unwrap();
assert_eq!(
reports.len(),
1,
"expected exactly one blocking report on drop"
);
assert_eq!(reports[0], "JFR1");
}
}
51 changes: 44 additions & 7 deletions src/reporter/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use async_trait::async_trait;
use chrono::SecondsFormat;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use thiserror::Error;

Expand Down Expand Up @@ -61,18 +61,22 @@ impl LocalReporter {
}
}

fn jfr_file_name() -> String {
let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
let time = time
.to_rfc3339_opts(SecondsFormat::Secs, true)
.replace(":", "-");
format!("{time}.jfr")
}

/// Writes the jfr file to disk.
async fn report_profiling_data(
&self,
jfr: Vec<u8>,
_metadata_obj: &ReportMetadata<'_>,
) -> Result<(), std::io::Error> {
let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
let time = time
.to_rfc3339_opts(SecondsFormat::Secs, true)
.replace(":", "-");
tracing::debug!("reporting {time}.jfr");
let file_name = format!("{time}.jfr");
let file_name = Self::jfr_file_name();
tracing::debug!("reporting {file_name}");
tokio::fs::write(self.directory.join(file_name), jfr).await?;
Ok(())
}
Expand All @@ -89,6 +93,18 @@ impl Reporter for LocalReporter {
.await
.map_err(|e| Box::new(LocalReporterError::IoError(e)) as _)
}

fn report_blocking(
&self,
jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let file_name = Self::jfr_file_name();
tracing::debug!("reporting {file_name} (blocking)");
std::fs::copy(jfr_path, self.directory.join(file_name))
.map(|_| ())
.map_err(|e| Box::new(LocalReporterError::IoError(e)) as _)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -120,4 +136,25 @@ mod test {
.unwrap();
assert_eq!(tokio::fs::read(jfr_file.path()).await.unwrap(), b"JFR");
}

#[test]
fn test_local_reporter_reports_on_drop() {
let dir = tempfile::tempdir().unwrap();
let src = dir.path().join("input.jfr");
std::fs::write(&src, b"JFR-DROP").unwrap();
let out_dir = tempfile::tempdir().unwrap();
let reporter = LocalReporter::new(out_dir.path());
reporter.report_blocking(&src, &DUMMY_METADATA).unwrap();
let jfr_file = std::fs::read_dir(out_dir.path())
.unwrap()
.flat_map(|f| f.ok())
.filter(|f| {
Path::new(&f.file_name())
.extension()
.is_some_and(|e| e == "jfr")
})
.next()
.unwrap();
assert_eq!(std::fs::read(jfr_file.path()).unwrap(), b"JFR-DROP");
}
}
18 changes: 18 additions & 0 deletions src/reporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! 3. [multi::MultiReporter], which allows combining multiple reporters.

use std::fmt;
use std::path::Path;

use async_trait::async_trait;

Expand Down Expand Up @@ -38,4 +39,21 @@ pub trait Reporter: fmt::Debug {
jfr: Vec<u8>,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>>;

/// Synchronously report profiling data. Called during drop when the
/// async runtime is shutting down and async reporting is not possible.
///
/// The default implementation does nothing. Reporters that can perform
/// synchronous I/O (like [`local::LocalReporter`]) should override this.
fn report_blocking(
&self,
_jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
tracing::info!(
"reporter does not support synchronous reporting, last sample will be lost. \
Add a call to `RunningProfiler::stop` to wait for the upload to finish."
);
Ok(())
}
}
23 changes: 23 additions & 0 deletions src/reporter/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::metadata::ReportMetadata;
use super::Reporter;

use std::fmt;
use std::path::Path;

/// An aggregated error that contains an error per reporter. A reporter is identified
/// by the result of its Debug impl.
Expand Down Expand Up @@ -109,6 +110,28 @@ impl Reporter for MultiReporter {
Err(Box::new(MultiError { errors }))
}
}

fn report_blocking(
&self,
jfr_path: &Path,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let errors: Vec<_> = self
.reporters
.iter()
.filter_map(|reporter| {
reporter
.report_blocking(jfr_path, metadata)
.err()
.map(|e| (format!("{reporter:?}"), e))
})
.collect();
if errors.is_empty() {
Ok(())
} else {
Err(Box::new(MultiError { errors }))
}
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions tests/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ dir="profiles"
mkdir -p $dir
rm -f $dir/*.jfr

# test the "unable to uploqad the last jfr" logic
# test the drop-path final report logic
rm -rf $dir/short
mkdir $dir/short
./simple --local $dir/short --duration 1s --reporting-interval 10s --no-clean-stop >$dir/short/log
cat $dir/short/log
grep "unable to upload the last jfr" $dir/short/log
grep "profiler task cancelled, attempting final report on drop" $dir/short/log
rm -rf $dir/short

# Pass --worker-threads 16 to make the test much less flaky since there is always some worker thread running
Expand Down
Loading