Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ url = { version = "2", features = ["serde"] }
axum = { version = "0.7", features = ["macros"] }
tower = { version = "0.5", features = ["full", "util"] }
tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip", "request-id"] }
tower_governor = "0.4"

# Async runtime
tokio = { version = "1", features = ["full"] }
Expand All @@ -36,11 +37,17 @@ redis = { version = "0.27", features = ["tokio-comp", "json", "connection-manage
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
schemars = "0.8"

# Observability
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
opentelemetry = { version = "0.24", features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.17", features = ["trace", "grpc-tonic"] }
opentelemetry-semantic-conventions = "0.16"
opentelemetry_sdk = { version = "0.24", features = ["trace", "rt-tokio"] }
tracing-opentelemetry = "0.25"
tonic = "0.12"

# OpenTelemetry (Upgrade to 0.31+ from main branch)
opentelemetry = { version = "0.31", features = ["trace"] }
Expand All @@ -53,23 +60,28 @@ opentelemetry-semantic-conventions = "0.16"
uuid = { version = "1.0", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
dotenvy = "0.15"
thiserror = "1.0"
thiserror = "1"
anyhow = "1.0"
config = "0.14.0"
arc-swap = "1.7"
async-trait = "0.1"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
base64 = "0.22"
validator = { version = "0.19", features = ["derive"] }
rust_decimal = { version = "1.35", features = ["serde"] }

# Stellar
stellar-xdr = { version = "21.0", features = ["std"] }

# External Integrations
# API documentation
utoipa = { version = "5.0", features = ["axum_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "8.0", features = ["axum"] }

# Background jobs
apalis = { version = "0.6" }
apalis-redis = "0.6"
rust_decimal = { version = "1.35", features = ["serde"] }
stellar-xdr = { version = "21.0", features = ["std"] }
base64 = "0.22"
validator = { version = "0.19", features = ["derive"] }
tower_governor = "0.4"

# Optional: mock support for tests
mockall = { version = "0.13", optional = true }
tonic = "0.12"

# Scheduler
tokio-util = "0.7"
Expand All @@ -78,22 +90,29 @@ async-trait = "0.1"
arc-swap = "1.7"

[dev-dependencies]
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5", features = ["trace"] }
hyper = { version = "1.0", features = ["full"] }
mime = "0.3"
tokio = { version = "1", features = ["full", "test-util"] }
reqwest = { version = "0.12", features = ["json"] }
tokio-test = "0.4"
testcontainers = "0.16"
wiremock = "0.6"
hyper = { version = "1.0", features = ["full"] }
mime = "0.3"
arc-swap = "1.7"
async-trait = "0.1"
mockall = "0.13"
rust_decimal_macros = "1.35"
criterion = { version = "0.5", features = ["async_tokio"] }
temp-env = "0.3.6"
toml = "0.8.12"
http-body-util = "0.1"

[profile.release]
opt-level = 3
lto = true
codegen-units = 1
strip = true

[[bench]]
name = "performance"
harness = false

[[bench]]
name = "dashboard_bench"
harness = false
190 changes: 106 additions & 84 deletions backend/src/api/handlers/profiling.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::api::contracts::{
ApiResponse, ProfileTriggerRequest, ProfileTriggerResponse, SystemStatus, ValidatedJson,
};
use crate::config::reload::ConfigManager;
use crate::services::{
error_recovery::ErrorManager, log_aggregator::LogAggregator, sys_metrics::MetricsExporter,
tracing::TracingService,
};
//! Performance profiling and system health API handlers.
//!
//! Provides endpoints for monitoring application health, collecting system
//! metrics, and triggering profiling runs.

use axum::{extract::State, response::IntoResponse, Json};
use chrono::{DateTime, Utc};
use redis::Client as RedisClient;
Expand All @@ -15,44 +12,79 @@ use std::sync::Arc;
use tracing::{info, info_span, instrument};
use utoipa::ToSchema;

use crate::api::contracts::{
ApiResponse, ProfileTriggerRequest, ProfileTriggerResponse, SystemStatus, ValidatedJson,
};
use crate::config::reload::ConfigManager;
use crate::error::AppError;
use crate::services::{
error_recovery::ErrorManager,
log_aggregator::LogAggregator,
sys_metrics::MetricsExporter,
tracing::TracingService,
};
use redis::Client as RedisClient;

// ---------------------------------------------------------------------------
// Shared application state
// ---------------------------------------------------------------------------

/// Shared application state passed to profiling and status handlers.
pub struct AppState {
/// Optional PostgreSQL connection pool (None in tests).
pub db: Option<sqlx::PgPool>,
/// System metrics exporter.
pub metrics_exporter: Arc<MetricsExporter>,
/// Error recovery manager.
pub error_manager: Arc<ErrorManager>,
/// Hot-reloadable configuration manager.
pub config_manager: Arc<ConfigManager>,
/// Async log aggregation pipeline.
pub log_aggregator: Arc<LogAggregator>,
/// Redis client for caching.
pub redis: RedisClient,
}

// ---------------------------------------------------------------------------
// Response types
// ---------------------------------------------------------------------------

/// Detailed performance metrics report.
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub struct MetricsReport {
/// Total system uptime in seconds
/// Total system uptime in seconds.
pub uptime_secs: u64,
/// Current resident set size (RSS) in bytes
/// Current resident set size (RSS) in bytes.
pub memory_usage_bytes: u64,
/// Number of currently active HTTP requests
/// Number of currently active HTTP requests.
pub active_requests: u32,
/// Percentage of failed requests in the last window
/// Percentage of failed requests in the last window.
pub error_rate: f64,
/// Current latency for Stellar ledger ingestion in milliseconds
/// Current latency for Stellar ledger ingestion in milliseconds.
pub ledger_ingestion_latency_ms: u32,
}

/// System health check response.
#[derive(Debug, Serialize, ToSchema)]
pub struct HealthResponse {
/// Overall health status (e.g., 'healthy' or 'degraded')
/// Overall health status (e.g., `"healthy"` or `"degraded"`).
pub status: String,
/// The current version of the backend service
/// The current version of the backend service.
pub version: String,
/// RFC3339 timestamp of the health check
/// RFC3339 timestamp of the health check.
pub timestamp: DateTime<Utc>,
/// Connectivity status to the PostgreSQL database
/// Connectivity status to the PostgreSQL database.
pub database_connected: bool,
/// Connectivity status to the Redis cache
/// Connectivity status to the Redis cache.
pub redis_connected: bool,
}

/// Handler for retrieving detailed performance metrics.
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------

/// `GET /api/v1/profiling/metrics` — retrieve detailed performance metrics.
///
/// Optimized for consumption by monitoring tools like Grafana.
#[utoipa::path(
get,
Expand All @@ -67,16 +99,10 @@ pub struct HealthResponse {
pub async fn get_metrics(
State(state): State<Arc<AppState>>,
) -> Result<impl IntoResponse, AppError> {
let span = info_span!("metrics.collection");
let _enter = span.enter();

info!("Collecting performance metrics");


// Instrument the metrics exporter call
let metrics_span = TracingService::service_method_span("MetricsExporter", "get_metrics");
let _metrics_enter = metrics_span.enter();

let sys_metrics = state.metrics_exporter.get_metrics().await;
drop(_metrics_enter);

Expand All @@ -91,14 +117,14 @@ pub async fn get_metrics(
info!(
uptime = sys_metrics.uptime,
memory = sys_metrics.memory_usage,
active_requests = 12,
"Metrics collected successfully"
);

Ok(Json(report))
}

/// Handler for system health checks.
/// `GET /api/v1/profiling/health` — system health check.
///
/// Performs actual pings to downstream services.
#[utoipa::path(
get,
Expand All @@ -110,25 +136,27 @@ pub async fn get_metrics(
tag = "profiling"
)]
#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/v1/profiling/health"))]
pub async fn get_health(State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, AppError> {
let span = info_span!("health.check");
let _enter = span.enter();

pub async fn get_health(
State(state): State<Arc<AppState>>,
) -> Result<impl IntoResponse, AppError> {
info!("Performing system health check");

// Check database connectivity with tracing
let db_span = TracingService::db_query_span("SELECT 1", "postgres", "PING");
let _db_enter = db_span.enter();

let db_healthy = sqlx::query("SELECT 1")
.fetch_optional(&state.db)
.await
.map(|result| result.is_some())
.unwrap_or_else(|e| {
TracingService::record_error(&db_span, &e.to_string(), "database");
false
});
drop(_db_enter);
let db_healthy = if let Some(ref pool) = state.db {
let db_span = TracingService::db_query_span("SELECT 1", "postgres", "PING");
let _db_enter = db_span.enter();
let result = sqlx::query("SELECT 1")
.fetch_optional(pool)
.await
.map(|r| r.is_some())
.unwrap_or_else(|e| {
TracingService::record_error(&db_span, &e.to_string(), "database");
false
});
drop(_db_enter);
result
} else {
false
};

let response = HealthResponse {
status: if db_healthy { "healthy" } else { "degraded" }.to_string(),
Expand All @@ -147,28 +175,24 @@ pub async fn get_health(State(state): State<Arc<AppState>>) -> Result<impl IntoR
Ok(Json(response))
}

/// Handler for Prometheus-compatible metrics.
/// `GET /api/v1/profiling/prometheus` — Prometheus-compatible metrics.
#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/v1/profiling/prometheus"))]
pub async fn get_prometheus_metrics() -> impl IntoResponse {
let span = info_span!("prometheus.metrics.export");
let _enter = span.enter();

info!("Exporting Prometheus-format metrics");
let metrics = "# HELP backend_requests_total Total number of requests\n\
# TYPE backend_requests_total counter\n\
backend_requests_total 1024\n\
# HELP backend_ledger_latency_ms Current ledger ingestion latency\n\
# TYPE backend_ledger_latency_ms gauge\n\
backend_ledger_latency_ms 120\n";
metrics.to_string()
"# HELP backend_requests_total Total number of requests\n\
# TYPE backend_requests_total counter\n\
backend_requests_total 1024\n\
# HELP backend_ledger_latency_ms Current ledger ingestion latency\n\
# TYPE backend_ledger_latency_ms gauge\n\
backend_ledger_latency_ms 120\n"
.to_string()
}

/// Handler for detailed system status
/// `GET /api/status` — detailed system status.
#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/status"))]
pub async fn get_system_status(State(state): State<Arc<AppState>>) -> ApiResponse<SystemStatus> {
let span = info_span!("system.status");
let _enter = span.enter();

pub async fn get_system_status(
State(state): State<Arc<AppState>>,
) -> ApiResponse<SystemStatus> {
info!("Retrieving system status");

let metrics_span = TracingService::service_method_span("MetricsExporter", "get_metrics");
Expand All @@ -189,36 +213,34 @@ pub async fn get_system_status(State(state): State<Arc<AppState>>) -> ApiRespons
})
}

/// Handler to trigger profile collection (CPU, memory profiling)
/// `POST /api/profile` — trigger a profiling collection run.
#[utoipa::path(
post,
path = "/api/profile",
responses(
(status = 200, description = "Profiling collection triggered"),
(status = 400, description = "Invalid request parameters")
),
tag = "profiling"
)]
#[instrument(skip_all, fields(http.method = "POST", http.route = "/api/profile"))]
pub async fn trigger_profile_collection(
State(_state): State<Arc<AppState>>,
ValidatedJson(payload): ValidatedJson<ProfileTriggerRequest>,
) -> Result<ApiResponse<ProfileTriggerResponse>, AppError> {
// In a real implementation, this would trigger a CPU/Memory profile
// using the provided payload (duration, sample rate, etc.)

// Validate duration doesn't cause overflow in chrono::Duration (Issue #208)
// chrono::Duration::seconds() accepts i64, so we need to ensure payload.duration_secs <= i64::MAX
if payload.duration_secs > i64::MAX as u32 {
return Err(AppError::BadRequest(format!("Invalid duration_secs (Issue #208): too large for time calculation, maximum {}", i64::MAX)));
}
// Additional safety check for chrono::Duration::seconds() bounds
if payload.duration_secs > 2_147_483_647 {
return Err(AppError::BadRequest(format!("Invalid duration_secs (Issue #208): exceeds safe bounds for chrono::Duration::seconds(), maximum 2,147,483,647, got {}", payload.duration_secs)));
}

) -> ApiResponse<ProfileTriggerResponse> {
let profile_id = uuid::Uuid::new_v4();
let message = format!(
"Profiling collection triggered for label: {}",
payload.label

info!(
profile_id = %profile_id,
label = %payload.label,
duration_secs = payload.duration_secs,
"Profiling collection triggered"
);
let estimated_completion = chrono::Utc::now()
+ chrono::Duration::seconds(payload.duration_secs as i64);

Ok(ApiResponse::new(ProfileTriggerResponse {
ApiResponse::new(ProfileTriggerResponse {
profile_id,
message,
estimated_completion,
}))
message: format!("Profiling collection triggered for label: {}", payload.label),
estimated_completion: chrono::Utc::now()
+ chrono::Duration::seconds(payload.duration_secs as i64),
})
}
Loading