Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 67 additions & 51 deletions crates/rustapi-extras/src/insight/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,73 +222,89 @@ pub struct WebhookExporter {
config: WebhookConfig,
buffer: Arc<Mutex<Vec<InsightData>>>,
#[cfg(feature = "webhook")]
client: reqwest::Client,
sender: tokio::sync::mpsc::Sender<Vec<InsightData>>,
#[cfg(not(feature = "webhook"))]
_marker: std::marker::PhantomData<()>,
}

impl WebhookExporter {
/// Create a new webhook exporter.
pub fn new(config: WebhookConfig) -> Self {
#[cfg(feature = "webhook")]
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(config.timeout_secs))
.build()
.expect("Failed to build HTTP client");
{
// Allow buffering up to 100 batches before dropping
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<InsightData>>(100);
let config_clone = config.clone();

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(config_clone.timeout_secs))
.build()
.expect("Failed to build HTTP client");

rt.block_on(async move {
while let Some(insights) = rx.recv().await {
let mut request = client.post(&config_clone.url).json(&insights);

if let Some(ref auth_value) = config_clone.auth_header {
request = request.header("Authorization", auth_value);
}

// Add custom headers
for (k, v) in &config_clone.headers {
request = request.header(k, v);
}

match request.send().await {
Ok(response) => {
if !response.status().is_success() {
tracing::error!(
"Webhook returned status {}",
response.status()
);
}
}
Err(e) => {
tracing::error!("Webhook error: {}", e);
}
}
}
});
});

Self {
config,
buffer: Arc::new(Mutex::new(Vec::new())),
sender: tx,
}
}

#[cfg(not(feature = "webhook"))]
Self {
config,
buffer: Arc::new(Mutex::new(Vec::new())),
#[cfg(feature = "webhook")]
client,
_marker: std::marker::PhantomData,
}
}

/// Send insights to the webhook.
#[cfg(feature = "webhook")]
fn send_insights(&self, insights: &[InsightData]) -> ExportResult<()> {
use std::sync::mpsc;

// Use a channel to get the result from the async context
let (tx, rx) = mpsc::channel();
let client = self.client.clone();
let url = self.config.url.clone();
let auth = self.config.auth_header.clone();
let insights = insights.to_vec();

// Spawn a blocking task to run the async request
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let result = rt.block_on(async {
let mut request = client.post(&url).json(&insights);

if let Some(auth_value) = auth {
request = request.header("Authorization", auth_value);
}

match request.send().await {
Ok(response) => {
if response.status().is_success() {
Ok(())
} else {
Err(ExportError::Unavailable(format!(
"Webhook returned status {}",
response.status()
)))
}
}
Err(e) => Err(ExportError::Unavailable(e.to_string())),
}
});

let _ = tx.send(result);
});

// Wait for the result with timeout
rx.recv_timeout(std::time::Duration::from_secs(self.config.timeout_secs + 1))
.map_err(|_| ExportError::Unavailable("Webhook request timed out".to_string()))?
match self.sender.try_send(insights.to_vec()) {
Ok(_) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("Webhook exporter channel full, dropping batch");
Ok(())
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err(
ExportError::Unavailable("Webhook worker channel closed".to_string()),
),
}
}

/// Send insights to the webhook (stub when webhook feature is disabled).
Expand Down
55 changes: 55 additions & 0 deletions crates/rustapi-extras/tests/webhook_performance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#![cfg(feature = "webhook")]

use rustapi_extras::insight::export::{InsightExporter, WebhookConfig, WebhookExporter};
use rustapi_extras::insight::InsightData;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::test]
async fn test_webhook_blocking_behavior() {
// Start a dummy server that sleeps
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
loop {
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let mut buf = [0; 1024];
let _ = socket.read(&mut buf).await;
// Simulate slow processing
tokio::time::sleep(Duration::from_millis(500)).await;
let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
let _ = socket.write_all(response.as_bytes()).await;
});
}
});

// Configure exporter with batch size 1 to trigger send immediately
let config = WebhookConfig::new(format!("http://{}", addr))
.batch_size(1)
.timeout(2);

let exporter = WebhookExporter::new(config);
let insight = InsightData::new("test", "GET", "/");

let start = Instant::now();
// This should trigger a send because batch_size is 1.
// In current implementation, it blocks waiting for response.
match exporter.export(&insight) {
Ok(_) => println!("Export successful"),
Err(e) => println!("Export failed: {:?}", e),
}
let duration = start.elapsed();

println!("Export took: {:?}", duration);

// If it blocks, it should take at least 500ms (due to 500ms server sleep or timeout)
if duration.as_millis() >= 400 {
panic!(
"Performance regression: Export blocked for {:?}. Expected non-blocking behavior.",
duration
);
}
}
Loading