From 69caa37e33d4b4ad426557f8bccfe1043b798dd9 Mon Sep 17 00:00:00 2001 From: malik203 <72547228+malik203@users.noreply.github.com> Date: Wed, 27 May 2026 23:21:01 +0000 Subject: [PATCH] feat: implement config hot-reload and fix corrupted backend files - Add ConfigManager (ArcSwap-based, patch-capable) in config/reload.rs - Add ConfigWatcher (Redis pub/sub driven reload) with ConfigHandle - Add ReloadError, handle_reload and handle_get_config Axum handlers - Move AppConfig to config/mod.rs with ServerConfig, DatabaseConfig, RedisConfig - Fix corrupted concatenated duplicates in Cargo.toml, lib.rs, error.rs, jobs.rs, telemetry.rs, services/mod.rs, api/mod.rs, handlers/mod.rs, middleware/mod.rs, profiling.rs, dashboard.rs --- backend/Cargo.toml | 146 ++----- backend/src/api/handlers/dashboard.rs | 412 +++----------------- backend/src/api/handlers/profiling.rs | 206 +++------- backend/src/api/mod.rs | 2 +- backend/src/config/mod.rs | 113 ++++-- backend/src/config/reload.rs | 536 ++++++++++++-------------- backend/src/error.rs | 124 +----- backend/src/jobs.rs | 10 +- backend/src/lib.rs | 5 +- backend/src/services/mod.rs | 3 +- backend/src/telemetry.rs | 1 - 11 files changed, 486 insertions(+), 1072 deletions(-) diff --git a/backend/Cargo.toml b/backend/Cargo.toml index c661a3e..cfca915 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -2,32 +2,28 @@ name = "backend" version = "0.1.0" edition = "2021" - -[dependencies] -axum = "0.7" -tokio = { version = "1", features = ["full"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" -sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid"] } -redis = { version = "0.25", features = ["tokio-comp"] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -thiserror = "1.0" -chrono = { version = "0.4", features = ["serde"] } -uuid = { version = "1", features = ["v4", "serde"] } -dotenvy = "0.15" -tower-http = { version = "0.5", features = ["trace"] } -name = "crucible-backend" -version = "0.1.0" -edition = "2021" description = "Backend API server for the Crucible smart contract testing platform" license = "MIT" -authors = ["Crucible Contributors"] [[bin]] -name = "crucible-backend" +name = "backend" path = "src/main.rs" +[[bin]] +name = "backup" +path = "src/bin/backup.rs" + +[[bench]] +name = "performance" +harness = false + +[[bench]] +name = "dashboard_bench" +harness = false + +[features] +testutils = ["mockall"] + [dependencies] # Web framework axum = { version = "0.7", features = ["macros"] } @@ -36,6 +32,7 @@ tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip", # Async runtime tokio = { version = "1", features = ["full"] } +futures-util = { version = "0.3", default-features = false, features = ["std"] } # Database sqlx = { version = "0.7", features = [ @@ -57,114 +54,45 @@ serde_json = "1" # Observability tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +opentelemetry = { version = "0.24", features = ["trace"] } +opentelemetry-otlp = { version = "0.17", features = ["trace", "grpc-tonic"] } +opentelemetry_sdk = { version = "0.24", features = ["trace", "rt-tokio"] } +tracing-opentelemetry = "0.25" +tonic = "0.12" # Utilities uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" thiserror = "1" - -[dev-dependencies] -# Testing -reqwest = { version = "0.12", features = ["json"] } -tokio-test = "0.4" -testcontainers = "0.16" -wiremock = "0.6" - -[profile.release] -opt-level = 3 -lto = true -codegen-units = 1 -strip = true - -[dependencies] -axum = "0.7" -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "macros"] } -redis = { version = "0.25", features = ["tokio-comp"] } -tokio = { version = "1.0", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -schemars = "0.8" -tracing = "0.1" -tracing-subscriber = "0.3" - -[dev-dependencies] -tower = "0.4" -name = "backend" -version = "0.1.0" -edition = "2021" - -[[bin]] -name = "backup" -path = "src/bin/backup.rs" -[features] -testutils = ["mockall"] - -[dependencies] -axum = "0.7" -tokio = { version = "1", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono", "uuid"] } -redis = { version = "0.24", features = ["tokio-comp", "json"] } -sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "json"] } -redis = { version = "0.27", features = ["tokio-comp", "json"] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -anyhow = "1.0" -thiserror = "1.0" -chrono = { version = "0.4", features = ["serde"] } -uuid = { version = "1.0", features = ["v4", "serde"] } -tower = { version = "0.5", features = ["util"] } -tower-http = { version = "0.5", features = ["trace"] } - -[dev-dependencies] -tower = { version = "0.5", features = ["util"] } -hyper = { version = "1.0", features = ["full"] } -mime = "0.3" -tokio = { version = "1", features = ["full", "test-util"] } +anyhow = "1" arc-swap = "1.7" async-trait = "0.1" -dotenvy = "0.15" +base64 = "0.22" +validator = { version = "0.19", features = ["derive"] } +rust_decimal = { version = "1.35", features = ["serde"] } +stellar-xdr = { version = "21.0", features = ["std"] } utoipa = { version = "5.0", features = ["axum_extras", "chrono", "uuid"] } utoipa-swagger-ui = { version = "8.0", features = ["axum"] } apalis = { version = "0.6" } apalis-redis = "0.6" -rust_decimal = { version = "1.35", features = ["serde"] } -stellar-xdr = { version = "21.0", features = ["std"] } -base64 = "0.22" -validator = { version = "0.19", features = ["derive"] } -tower-http = { version = "0.5", features = ["cors", "trace"] } tower_governor = "0.4" mockall = { version = "0.13", optional = true } -opentelemetry = { version = "0.31", features = ["trace"] } -opentelemetry_sdk = { version = "0.31", features = ["trace", "rt-tokio"] } -opentelemetry-otlp = { version = "0.31", default-features = false, features = ["trace", "http-proto", "reqwest-client"] } -tracing-opentelemetry = { version = "0.32", default-features = false } -futures-util = { version = "0.3", default-features = false, features = ["std"] } -# OpenTelemetry and tracing instrumentation -opentelemetry = { version = "0.24", features = ["trace", "metrics"] } -opentelemetry-otlp = { version = "0.17", features = ["trace", "grpc-tonic"] } -opentelemetry-semantic-conventions = "0.16" -opentelemetry_sdk = { version = "0.24", features = ["trace", "rt-tokio"] } -tracing-opentelemetry = "0.25" -tonic = "0.12" [dev-dependencies] tower = { version = "0.4", features = ["util"] } tower-http = { version = "0.5", features = ["trace"] } -rust_decimal_macros = "1.35" -criterion = { version = "0.5", features = ["async_tokio"] } hyper = { version = "1.0", features = ["full"] } mime = "0.3" +tokio = { version = "1", features = ["full", "test-util"] } +reqwest = { version = "0.12", features = ["json"] } +tokio-test = "0.4" mockall = "0.13" -mockall = "0.12" - -[[bench]] -name = "performance" -harness = false - -[[bench]] -name = "dashboard_bench" -harness = false +rust_decimal_macros = "1.35" +criterion = { version = "0.5", features = ["async_tokio"] } +[profile.release] +opt-level = 3 +lto = true +codegen-units = 1 +strip = true diff --git a/backend/src/api/handlers/dashboard.rs b/backend/src/api/handlers/dashboard.rs index 4f39154..72caf7c 100644 --- a/backend/src/api/handlers/dashboard.rs +++ b/backend/src/api/handlers/dashboard.rs @@ -1,51 +1,55 @@ -use axum::{Json, response::IntoResponse, extract::{State, Path}}; -use serde::{Serialize, Deserialize}; -use tracing::{info, instrument, error}; -use chrono::{DateTime, Utc}; -use crate::error::AppError; -use utoipa::ToSchema; +//! Dashboard metrics handlers. +//! +//! Provides endpoints for aggregated contract and transaction metrics, +//! with Redis caching to reduce database load. + use std::sync::Arc; -use sqlx::PgPool; + +use axum::{ + extract::{Path, State}, + response::IntoResponse, + Json, +}; +use chrono::{DateTime, Utc}; use redis::AsyncCommands; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tracing::{error, info, instrument}; +use utoipa::ToSchema; + +use crate::error::AppError; -/// Shared application state for dashboard handlers +/// Shared application state for dashboard handlers. pub struct DashboardState { pub db: PgPool, pub redis: redis::aio::ConnectionManager, } +/// Aggregated dashboard metrics. #[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct DashboardMetrics { - /// Total number of active contracts pub total_contracts: i64, - /// Total number of transactions processed pub total_transactions: i64, - /// Average transaction processing time in milliseconds pub avg_processing_time_ms: f64, - /// Number of failed transactions in the last 24 hours pub failed_transactions_24h: i64, - /// Timestamp of the metrics snapshot pub timestamp: DateTime, } +/// Per-contract statistics. #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct ContractStats { - /// Contract identifier pub contract_id: String, - /// Number of invocations pub invocation_count: i64, - /// Last invocation timestamp pub last_invoked: Option>, - /// Average gas cost pub avg_gas_cost: f64, } -/// Retrieves aggregated dashboard metrics with Redis caching +/// `GET /api/v1/dashboard/metrics` — Aggregated dashboard metrics with Redis caching. #[utoipa::path( get, path = "/api/v1/dashboard/metrics", responses( - (status = 200, description = "Dashboard metrics retrieved successfully", body = DashboardMetrics), + (status = 200, description = "Dashboard metrics", body = DashboardMetrics), (status = 500, description = "Internal server error") ), tag = "dashboard" @@ -56,42 +60,37 @@ pub async fn get_dashboard_metrics( ) -> Result { info!("Fetching dashboard metrics"); - // Try cache first - let cache_key = "dashboard:metrics"; + const CACHE_KEY: &str = "dashboard:metrics"; let mut redis_conn = state.redis.clone(); - - if let Ok(cached) = redis_conn.get::<_, String>(cache_key).await { + + // Try cache first. + if let Ok(cached) = redis_conn.get::<_, String>(CACHE_KEY).await { if let Ok(metrics) = serde_json::from_str::(&cached) { info!("Returning cached dashboard metrics"); return Ok(Json(metrics)); } } - // Fetch from database - let total_contracts = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM contracts" - ) - .fetch_optional(&state.db) - .await? - .unwrap_or(0); + let total_contracts = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM contracts") + .fetch_optional(&state.db) + .await? + .unwrap_or(0); - let total_transactions = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM transactions" - ) - .fetch_optional(&state.db) - .await? - .unwrap_or(0); + let total_transactions = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM transactions") + .fetch_optional(&state.db) + .await? + .unwrap_or(0); let avg_processing_time = sqlx::query_scalar::<_, Option>( - "SELECT AVG(processing_time_ms) FROM transactions WHERE processing_time_ms IS NOT NULL" + "SELECT AVG(processing_time_ms) FROM transactions WHERE processing_time_ms IS NOT NULL", ) .fetch_one(&state.db) .await? .unwrap_or(0.0); let failed_24h = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM transactions - WHERE status = 'failed' AND created_at > NOW() - INTERVAL '24 hours'" + "SELECT COUNT(*) FROM transactions \ + WHERE status = 'failed' AND created_at > NOW() - INTERVAL '24 hours'", ) .fetch_optional(&state.db) .await? @@ -105,9 +104,9 @@ pub async fn get_dashboard_metrics( timestamp: Utc::now(), }; - // Cache for 60 seconds + // Cache for 60 seconds (best-effort). if let Ok(json) = serde_json::to_string(&metrics) { - let _: Result<(), _> = redis_conn.set_ex(cache_key, json, 60).await; + let _: Result<(), _> = redis_conn.set_ex(CACHE_KEY, json, 60).await; } info!( @@ -119,15 +118,13 @@ pub async fn get_dashboard_metrics( Ok(Json(metrics)) } -/// Retrieves statistics for a specific contract +/// `GET /api/v1/dashboard/contracts/:contract_id/stats` — Per-contract statistics. #[utoipa::path( get, path = "/api/v1/dashboard/contracts/{contract_id}/stats", - params( - ("contract_id" = String, Path, description = "Contract identifier") - ), + params(("contract_id" = String, Path, description = "Contract identifier")), responses( - (status = 200, description = "Contract statistics retrieved", body = ContractStats), + (status = 200, description = "Contract statistics", body = ContractStats), (status = 404, description = "Contract not found"), (status = 500, description = "Internal server error") ), @@ -143,17 +140,15 @@ pub async fn get_contract_stats( let cache_key = format!("dashboard:contract:{}:stats", contract_id); let mut redis_conn = state.redis.clone(); - // Check cache if let Ok(cached) = redis_conn.get::<_, String>(&cache_key).await { if let Ok(stats) = serde_json::from_str::(&cached) { return Ok(Json(stats)); } } - // Query database let result = sqlx::query!( r#" - SELECT + SELECT COUNT(*) as "invocation_count!", MAX(created_at) as last_invoked, AVG(gas_cost) as avg_gas_cost @@ -174,11 +169,13 @@ pub async fn get_contract_stats( }, _ => { error!(contract_id = %contract_id, "Contract not found"); - return Err(AppError::NotFound(format!("Contract {} not found", contract_id))); + return Err(AppError::NotFound(format!( + "Contract {} not found", + contract_id + ))); } }; - // Cache for 30 seconds if let Ok(json) = serde_json::to_string(&stats) { let _: Result<(), _> = redis_conn.set_ex(&cache_key, json, 30).await; } @@ -199,12 +196,10 @@ mod tests { failed_transactions_24h: 3, timestamp: Utc::now(), }; - let json = serde_json::to_string(&metrics).unwrap(); - let deserialized: DashboardMetrics = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.total_contracts, 100); - assert_eq!(deserialized.total_transactions, 5000); + let back: DashboardMetrics = serde_json::from_str(&json).unwrap(); + assert_eq!(back.total_contracts, 100); + assert_eq!(back.total_transactions, 5000); } #[test] @@ -215,308 +210,9 @@ mod tests { last_invoked: Some(Utc::now()), avg_gas_cost: 1500.75, }; - let json = serde_json::to_string(&stats).unwrap(); - let deserialized: ContractStats = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.contract_id, "test_contract_123"); - assert_eq!(deserialized.invocation_count, 42); -//! Dashboard data API handler. -//! -//! Provides a single `GET /api/dashboard` endpoint that aggregates system -//! metrics, active recovery tasks, and active alerts into one response. -//! Results are cached in Redis for [`CACHE_TTL_SECS`] seconds to reduce -//! load on downstream services. -//! -//! # Example -//! ```rust,no_run -//! use std::sync::Arc; -//! use axum::{Router, routing::get}; -//! use backend::api::handlers::dashboard::{DashboardState, get_dashboard}; -//! -//! # async fn example() { -//! // state is constructed with your real service instances -//! # } -//! ``` - -use axum::{extract::State, response::IntoResponse, Json}; -use redis::{AsyncCommands, Client as RedisClient}; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use thiserror::Error; -use tracing::{debug, error, warn}; - -use crate::services::{ - error_recovery::{ErrorManager, RecoveryTask}, - log_alerts::{Alert, AlertManager}, - sys_metrics::{MetricsExporter, SystemMetrics}, -}; - -// --------------------------------------------------------------------------- -// Constants -// --------------------------------------------------------------------------- - -const CACHE_KEY: &str = "dashboard:summary"; -const CACHE_TTL_SECS: u64 = 30; - -// --------------------------------------------------------------------------- -// Error type -// --------------------------------------------------------------------------- - -/// Errors that can occur while building the dashboard response. -#[derive(Debug, Error)] -pub enum DashboardError { - /// A Redis error occurred. - #[error("Cache error: {0}")] - Cache(#[from] redis::RedisError), - - /// JSON serialization/deserialization failed. - #[error("Serialization error: {0}")] - Serialization(#[from] serde_json::Error), -} - -impl IntoResponse for DashboardError { - fn into_response(self) -> axum::response::Response { - error!(error = %self, "Dashboard handler error"); - let body = serde_json::json!({ "error": self.to_string() }); - (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(body)).into_response() - } -} - -// --------------------------------------------------------------------------- -// Response types -// --------------------------------------------------------------------------- - -/// Aggregated dashboard data returned by `GET /api/dashboard`. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DashboardData { - /// Current system metrics snapshot. - pub metrics: SystemMetrics, - /// Recovery tasks that are currently active. - pub active_recovery_tasks: Vec, - /// Alerts that have fired and not yet been resolved. - pub active_alerts: Vec, -} - -// --------------------------------------------------------------------------- -// State -// --------------------------------------------------------------------------- - -/// Shared application state for the dashboard handler. -pub struct DashboardState { - pub metrics_exporter: Arc, - pub error_manager: Arc, - pub alert_manager: Arc, - pub redis: RedisClient, -} - -// --------------------------------------------------------------------------- -// Handler -// --------------------------------------------------------------------------- - -/// `GET /api/dashboard` — return aggregated dashboard data. -/// -/// Attempts to serve a cached response from Redis. On cache miss (or cache -/// error) the data is assembled from the live services and the cache is -/// populated before responding. -#[tracing::instrument(skip(state))] -pub async fn get_dashboard( - State(state): State>, -) -> Result { - // --- try cache --- - match try_cache_get(&state.redis).await { - Ok(Some(cached)) => { - debug!("Dashboard cache hit"); - return Ok(Json(cached)); - } - Ok(None) => debug!("Dashboard cache miss"), - Err(e) => warn!(error = %e, "Dashboard cache read failed; falling back to live data"), - } - - // --- assemble live data --- - let (metrics, active_recovery_tasks, active_alerts) = tokio::join!( - state.metrics_exporter.get_metrics(), - state.error_manager.get_active_tasks(), - state.alert_manager.get_active_alerts(), - ); - - let data = DashboardData { - metrics, - active_recovery_tasks, - active_alerts, - }; - - // --- populate cache (best-effort) --- - if let Err(e) = try_cache_set(&state.redis, &data).await { - warn!(error = %e, "Failed to populate dashboard cache"); - } - - Ok(Json(data)) -} - -// --------------------------------------------------------------------------- -// Cache helpers -// --------------------------------------------------------------------------- - -async fn try_cache_get(redis: &RedisClient) -> Result, DashboardError> { - let mut conn = redis.get_multiplexed_async_connection().await?; - let raw: Option = conn.get(CACHE_KEY).await?; - match raw { - Some(s) => Ok(Some(serde_json::from_str(&s)?)), - None => Ok(None), - } -} - -async fn try_cache_set(redis: &RedisClient, data: &DashboardData) -> Result<(), DashboardError> { - let serialized = serde_json::to_string(data)?; - let mut conn = redis.get_multiplexed_async_connection().await?; - let _: () = conn.set_ex(CACHE_KEY, serialized, CACHE_TTL_SECS).await?; - Ok(()) -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - use axum::{body::Body, http::Request, routing::get, Router}; - use tower::ServiceExt; - - fn make_state() -> Arc { - Arc::new(DashboardState { - metrics_exporter: Arc::new(MetricsExporter::new()), - error_manager: Arc::new(ErrorManager::new()), - alert_manager: Arc::new(AlertManager::new()), - // Use a URL that will fail to connect — the handler degrades gracefully. - redis: RedisClient::open("redis://127.0.0.1:1/").unwrap(), - }) - } - - fn make_app(state: Arc) -> Router { - Router::new() - .route("/api/dashboard", get(get_dashboard)) - .with_state(state) - } - - #[tokio::test] - async fn test_dashboard_returns_200_without_redis() { - let app = make_app(make_state()); - - let response = app - .oneshot( - Request::builder() - .uri("/api/dashboard") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), axum::http::StatusCode::OK); - } - - #[tokio::test] - async fn test_dashboard_response_shape() { - let app = make_app(make_state()); - - let response = app - .oneshot( - Request::builder() - .uri("/api/dashboard") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - - assert!(json.get("metrics").is_some()); - assert!(json.get("active_recovery_tasks").is_some()); - assert!(json.get("active_alerts").is_some()); - } - - #[tokio::test] - async fn test_dashboard_metrics_fields() { - let state = make_state(); - state.metrics_exporter.update_metrics(42.0, 2048, 120).await; - - let app = make_app(state); - let response = app - .oneshot( - Request::builder() - .uri("/api/dashboard") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - - assert_eq!(json["metrics"]["cpu_usage"], 42.0); - assert_eq!(json["metrics"]["memory_usage"], 2048); - assert_eq!(json["metrics"]["uptime"], 120); - } - - #[tokio::test] - async fn test_dashboard_includes_recovery_tasks() { - use crate::services::error_recovery::RecoveryError; - - let state = make_state(); - state - .error_manager - .handle_error(RecoveryError::Internal("boom".into()), "worker_a") - .await - .unwrap(); - - let app = make_app(state); - let response = app - .oneshot( - Request::builder() - .uri("/api/dashboard") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - - let tasks = json["active_recovery_tasks"].as_array().unwrap(); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0]["name"], "worker_a"); - } - - #[test] - fn test_dashboard_error_display() { - let err = DashboardError::Serialization( - serde_json::from_str::("bad json").unwrap_err(), - ); - assert!(!err.to_string().is_empty()); - } - - #[test] - fn test_dashboard_data_serialization_roundtrip() { - let data = DashboardData { - metrics: SystemMetrics::default(), - active_recovery_tasks: vec![], - active_alerts: vec![], - }; - let json = serde_json::to_string(&data).unwrap(); - let back: DashboardData = serde_json::from_str(&json).unwrap(); - assert_eq!(back.active_recovery_tasks.len(), 0); - assert_eq!(back.active_alerts.len(), 0); + let back: ContractStats = serde_json::from_str(&json).unwrap(); + assert_eq!(back.contract_id, "test_contract_123"); + assert_eq!(back.invocation_count, 42); } } diff --git a/backend/src/api/handlers/profiling.rs b/backend/src/api/handlers/profiling.rs index a518fba..7e3313f 100644 --- a/backend/src/api/handlers/profiling.rs +++ b/backend/src/api/handlers/profiling.rs @@ -1,27 +1,25 @@ -use axum::extract::State; -use axum::{Json, response::IntoResponse, extract::State}; -use serde::{Serialize, Deserialize}; -use tracing::{info, instrument, info_span}; -use chrono::{DateTime, Utc}; -use crate::error::AppError; -use crate::services::{error_recovery::ErrorManager, sys_metrics::MetricsExporter}; +//! Profiling and health check handlers. + +use std::sync::Arc; + use axum::{extract::State, response::IntoResponse, Json}; use chrono::{DateTime, Utc}; +use redis::Client as RedisClient; use serde::{Deserialize, Serialize}; -use std::sync::Arc; use tracing::{info, instrument}; use utoipa::ToSchema; -use crate::services::{ - sys_metrics::MetricsExporter, - error_recovery::ErrorManager, - log_aggregator::LogAggregator, - tracing::TracingService, + +use crate::{ + config::reload::ConfigManager, + error::AppError, + services::{ + error_recovery::ErrorManager, + log_aggregator::LogAggregator, + sys_metrics::MetricsExporter, + }, }; -use crate::config::reload::ConfigManager; -use crate::api::contracts::{ApiResponse, SystemStatus, ProfileTriggerRequest, ProfileTriggerResponse, ValidatedJson}; -use sqlx::PgPool; -use redis::Client as RedisClient; +/// Shared application state passed to profiling and config handlers. pub struct AppState { pub db: Option, pub metrics_exporter: Arc, @@ -31,84 +29,52 @@ pub struct AppState { pub redis: RedisClient, } +/// Performance metrics snapshot. #[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct MetricsReport { - /// Total system uptime in seconds pub uptime_secs: u64, - /// Current resident set size (RSS) in bytes pub memory_usage_bytes: u64, - /// Number of currently active HTTP requests pub active_requests: u32, - /// Percentage of failed requests in the last window pub error_rate: f64, - /// Current latency for Stellar ledger ingestion in milliseconds pub ledger_ingestion_latency_ms: u32, } +/// Health check response. #[derive(Debug, Serialize, ToSchema)] pub struct HealthResponse { - /// Overall health status (e.g., 'healthy' or 'degraded') pub status: String, - /// The current version of the backend service pub version: String, - /// RFC3339 timestamp of the health check pub timestamp: DateTime, - /// Connectivity status to the PostgreSQL database pub database_connected: bool, - /// Connectivity status to the Redis cache pub redis_connected: bool, } -/// Handler for retrieving detailed performance metrics. -/// Optimized for consumption by monitoring tools like Grafana. +/// `GET /api/v1/profiling/metrics` — Return performance metrics. #[utoipa::path( get, path = "/api/v1/profiling/metrics", responses( - (status = 200, description = "Performance metrics retrieved successfully", body = MetricsReport), + (status = 200, description = "Performance metrics", body = MetricsReport), (status = 500, description = "Internal server error") ), tag = "profiling" )] -#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/v1/profiling/metrics"))] +#[instrument(skip_all)] pub async fn get_metrics( State(state): State>, ) -> Result { - let span = info_span!("metrics.collection"); - let _enter = span.enter(); - info!("Collecting performance metrics"); - - let sys_metrics = state.metrics_exporter.get_metrics().await; - - - // Instrument the metrics exporter call - let metrics_span = TracingService::service_method_span("MetricsExporter", "get_metrics"); - let _metrics_enter = metrics_span.enter(); - let sys_metrics = state.metrics_exporter.get_metrics().await; - drop(_metrics_enter); - - let report = MetricsReport { + Ok(Json(MetricsReport { uptime_secs: sys_metrics.uptime, memory_usage_bytes: sys_metrics.memory_usage, active_requests: 12, error_rate: 0.001, ledger_ingestion_latency_ms: 120, - }; - - info!( - uptime = sys_metrics.uptime, - memory = sys_metrics.memory_usage, - active_requests = 12, - "Metrics collected successfully" - ); - - Ok(Json(report)) + })) } -/// Handler for system health checks. -/// Performs actual pings to downstream services. +/// `GET /api/v1/profiling/health` — System health check. #[utoipa::path( get, path = "/api/v1/profiling/health", @@ -118,138 +84,62 @@ pub async fn get_metrics( ), tag = "profiling" )] -#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/v1/profiling/health"))] +#[instrument(skip_all)] pub async fn get_health( State(state): State>, ) -> Result { - let span = info_span!("health.check"); - let _enter = span.enter(); - info!("Performing system health check"); + let db_healthy = if let Some(ref pool) = state.db { + sqlx::query("SELECT 1") + .fetch_optional(pool) + .await + .map(|r| r.is_some()) + .unwrap_or(false) + } else { + false + }; - - // Check database connectivity with tracing - let db_span = TracingService::db_query_span( - "SELECT 1", - "postgres", - "PING" - ); - let _db_enter = db_span.enter(); - - let db_healthy = sqlx::query("SELECT 1") - .fetch_optional(&state.db) - .await - .map(|result| result.is_some()) - .unwrap_or_else(|e| { - TracingService::record_error(&db_span, &e.to_string(), "database"); - false - }); - drop(_db_enter); - - let response = HealthResponse { + Ok(Json(HealthResponse { status: if db_healthy { "healthy" } else { "degraded" }.to_string(), version: env!("CARGO_PKG_VERSION").to_string(), timestamp: Utc::now(), - database_connected: true, database_connected: db_healthy, redis_connected: true, - }; - - info!( - db_connected = db_healthy, - version = env!("CARGO_PKG_VERSION"), - "Health check completed" - ); - - Ok(Json(response)) + })) } -/// Handler for Prometheus-compatible metrics. -#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/v1/profiling/prometheus"))] +/// `GET /api/v1/profiling/prometheus` — Prometheus-format metrics. +#[instrument(skip_all)] pub async fn get_prometheus_metrics() -> impl IntoResponse { - let span = info_span!("prometheus.metrics.export"); - let _enter = span.enter(); - - info!("Exporting Prometheus-format metrics"); - "# HELP backend_requests_total Total number of requests\n\ - # TYPE backend_requests_total counter\n\ - backend_requests_total 1024\n\ - # HELP backend_ledger_latency_ms Current ledger ingestion latency\n\ - # TYPE backend_ledger_latency_ms gauge\n\ - backend_ledger_latency_ms 120\n" - .to_string() -} - -pub async fn get_system_status(State(state): State>) -> impl IntoResponse { # TYPE backend_requests_total counter\n\ backend_requests_total 1024\n\ # HELP backend_ledger_latency_ms Current ledger ingestion latency\n\ # TYPE backend_ledger_latency_ms gauge\n\ - backend_ledger_latency_ms 120\n".to_string() + backend_ledger_latency_ms 120\n" + .to_string() } -/// Handler for detailed system status -#[instrument(skip_all, fields(http.method = "GET", http.route = "/api/status"))] -pub async fn get_system_status( - State(state): State>, -) -> ApiResponse { -) -> impl IntoResponse { - let span = info_span!("system.status"); - let _enter = span.enter(); - - info!("Retrieving system status"); - - let metrics_span = TracingService::service_method_span("MetricsExporter", "get_metrics"); - let _metrics_enter = metrics_span.enter(); +/// `GET /api/status` — System status summary. +#[instrument(skip_all)] +pub async fn get_system_status(State(state): State>) -> impl IntoResponse { let metrics = state.metrics_exporter.get_metrics().await; - drop(_metrics_enter); - - let recovery_span = TracingService::service_method_span("ErrorManager", "get_active_tasks"); - let _recovery_enter = recovery_span.enter(); let recovery_tasks = state.error_manager.get_active_tasks().await; - drop(_recovery_enter); - - ApiResponse::new(SystemStatus { - status: "healthy".to_string(), - uptime_secs: metrics.uptime, - memory_used_bytes: metrics.memory_usage, - active_recovery_tasks: recovery_tasks.len(), - }) Json(serde_json::json!({ "status": "healthy", - "metrics": metrics, - "active_recovery_tasks": recovery_tasks, + "uptime_secs": metrics.uptime, + "memory_used_bytes": metrics.memory_usage, + "active_recovery_tasks": recovery_tasks.len(), })) } -pub async fn trigger_profile_collection(State(_state): State>) -> impl IntoResponse { -/// Handler to trigger profile collection (CPU, memory profiling) -#[instrument(skip_all, fields(http.method = "POST", http.route = "/api/profile"))] +/// `POST /api/profile` — Trigger profile collection. +#[instrument(skip_all)] pub async fn trigger_profile_collection( State(_state): State>, - ValidatedJson(payload): ValidatedJson, -) -> ApiResponse { - // In a real implementation, this would trigger a CPU/Memory profile - // using the provided payload (duration, sample rate, etc.) - - ApiResponse::new(ProfileTriggerResponse { - profile_id: uuid::Uuid::new_v4(), - message: format!("Profiling collection triggered for label: {}", payload.label), - estimated_completion: chrono::Utc::now() + chrono::Duration::seconds(payload.duration_secs as i64), - }) ) -> impl IntoResponse { - let span = info_span!("profiling.collection"); - let _enter = span.enter(); - let profile_id = uuid::Uuid::new_v4().to_string(); - - info!( - profile_id = %profile_id, - "Profiling collection triggered" - ); - - // In a real implementation, this would trigger a CPU/Memory profile + info!(profile_id = %profile_id, "Profiling collection triggered"); Json(serde_json::json!({ "message": "Profiling collection triggered", "profile_id": profile_id, diff --git a/backend/src/api/mod.rs b/backend/src/api/mod.rs index 01d3bc8..8dd0435 100644 --- a/backend/src/api/mod.rs +++ b/backend/src/api/mod.rs @@ -1,3 +1,3 @@ -pub mod handlers; pub mod contracts; +pub mod handlers; pub mod middleware; diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index c9a0299..56c8d8a 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -1,32 +1,92 @@ +//! Application configuration. +//! +//! - [`Config`] — environment-variable-based startup configuration. +//! - [`AppConfig`] — hot-reloadable runtime configuration. +//! - [`reload`] — [`ConfigManager`] and Axum handlers for live config updates. + pub mod reload; use serde::{Deserialize, Serialize}; +use std::env; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AppConfig { - pub server: ServerConfig, - pub database: DatabaseConfig, - pub redis: RedisConfig, +/// Startup configuration loaded from environment variables. +/// +/// Read once at process start. For values that change at runtime without a +/// restart, see [`AppConfig`] and [`reload::ConfigManager`]. +#[derive(Debug, Deserialize, Clone)] +pub struct Config { + pub database_url: String, + pub redis_url: String, + pub server_port: u16, + pub environment: String, pub log_level: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] +impl Config { + /// Load configuration from environment variables (`.env` file optional). + pub fn from_env() -> Result { + dotenvy::dotenv().ok(); + + Ok(Config { + database_url: env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://postgres:password@localhost:5432/backend".into()), + redis_url: env::var("REDIS_URL") + .unwrap_or_else(|_| "redis://localhost:6379".into()), + server_port: env::var("PORT") + .unwrap_or_else(|_| "3000".into()) + .parse()?, + environment: env::var("APP_ENV") + .unwrap_or_else(|_| "development".into()), + log_level: env::var("LOG_LEVEL") + .unwrap_or_else(|_| "info".into()), + }) + } +} + +// --------------------------------------------------------------------------- +// AppConfig — hot-reloadable runtime configuration +// --------------------------------------------------------------------------- + +/// Server bind configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ServerConfig { pub host: String, pub port: u16, } -#[derive(Debug, Clone, Serialize, Deserialize)] +/// Database pool configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct DatabaseConfig { pub url: String, pub max_connections: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +/// Redis connection configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct RedisConfig { pub url: String, } +/// Live application configuration that can be hot-reloaded at runtime. +/// +/// All fields have sensible defaults so the application starts without any +/// external configuration source. Use [`reload::ConfigManager`] to update +/// these values without restarting the process. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AppConfig { + pub server: ServerConfig, + pub database: DatabaseConfig, + pub redis: RedisConfig, + /// Tracing / log filter directive (e.g. `"backend=debug"`). + pub log_level: String, + /// Maximum number of database connections in the pool. + pub max_connections: u32, + /// Request timeout in seconds. + pub request_timeout_secs: u64, + /// Whether the maintenance mode banner is shown. + pub maintenance_mode: bool, +} + impl Default for AppConfig { fn default() -> Self { Self { @@ -36,42 +96,15 @@ impl Default for AppConfig { }, database: DatabaseConfig { url: "postgres://postgres:postgres@localhost:5432/crucible".to_string(), - max_connections: 5, + max_connections: 10, }, redis: RedisConfig { url: "redis://127.0.0.1:6379".to_string(), }, - log_level: "info".to_string(), + log_level: "backend=debug,tower_http=debug".to_string(), + max_connections: 10, + request_timeout_secs: 30, + maintenance_mode: false, } -//! Application configuration. - -pub mod reload; - -use serde::Deserialize; -use std::env; - -/// Environment-based application configuration. -#[derive(Debug, Deserialize, Clone)] -pub struct Config { - pub database_url: String, - pub redis_url: String, - pub server_port: u16, - pub environment: String, - pub log_level: String, -} - -impl Config { - /// Loads configuration from environment variables. - pub fn from_env() -> Result { - dotenvy::dotenv().ok(); - - Ok(Config { - database_url: env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://postgres:password@localhost:5432/backend".into()), - redis_url: env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".into()), - server_port: env::var("PORT").unwrap_or_else(|_| "3000".into()).parse()?, - environment: env::var("APP_ENV").unwrap_or_else(|_| "development".into()), - log_level: env::var("LOG_LEVEL").unwrap_or_else(|_| "info".into()), - }) } } diff --git a/backend/src/config/reload.rs b/backend/src/config/reload.rs index b56caa6..13dbbab 100644 --- a/backend/src/config/reload.rs +++ b/backend/src/config/reload.rs @@ -1,263 +1,193 @@ +//! Configuration hot-reload. +//! +//! This module provides two complementary APIs: +//! +//! ## [`ConfigManager`] — patch-based updates +//! +//! Wraps [`AppConfig`] in an [`arc_swap::ArcSwap`] for lock-free reads. +//! Supports atomic replacement via [`ConfigManager::reload`] and partial +//! JSON-patch updates via [`ConfigManager::update_from_patch`]. +//! +//! ## [`ConfigWatcher`] — Redis pub/sub driven reload +//! +//! Subscribes to the `config:reload` Redis channel. On every message it +//! fetches the JSON stored at `config:current`, deserialises it, and +//! atomically swaps the in-memory value. All readers that hold a +//! [`ConfigHandle`] see the new values on their next read. +//! +//! # Axum handlers +//! +//! | Route | Handler | Description | +//! |---|---|---| +//! | `GET /api/config` | [`handle_get_config`] | Return current config as JSON | +//! | `POST /api/config/reload` | [`handle_reload`] | Reload config from `config.json` | +//! +//! # Redis protocol +//! +//! ```text +//! SET config:current '{"log_level":"info","max_connections":50,...}' +//! PUBLISH config:reload "reload" +//! ``` +//! +//! # Example +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use backend::config::{AppConfig, reload::ConfigWatcher}; +//! +//! # async fn example() { +//! let watcher = Arc::new(ConfigWatcher::new(AppConfig::default())); +//! let handle = watcher.handle(); +//! +//! // Read the current config +//! let cfg = handle.get().await; +//! println!("log level: {}", cfg.log_level); +//! +//! // Trigger a manual reload +//! watcher.reload(AppConfig { maintenance_mode: true, ..AppConfig::default() }).await; +//! # } +//! ``` + use std::sync::Arc; + use arc_swap::ArcSwap; -use axum::{ - extract::State, - http::StatusCode, - response::IntoResponse, - Json, -}; +use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use redis::{AsyncCommands, Client as RedisClient}; use serde_json::Value; use thiserror::Error; -use tracing::{info, warn, instrument}; +use tokio::sync::{watch, RwLock}; +use tracing::{error, info, instrument, warn}; + use crate::config::AppConfig; +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + /// Errors that can occur during configuration reload. #[derive(Debug, Error)] -pub enum ConfigReloadError { - #[error("IO error: {0}")] - Io(#[from] std::io::Error), +pub enum ReloadError { + /// A Redis error occurred. + #[error("Redis error: {0}")] + Redis(#[from] redis::RedisError), - #[error("Serialization error: {0}")] - Serialization(#[from] serde_json::Error), + /// The configuration value could not be deserialised. + #[error("Config deserialisation error: {0}")] + Deserialise(#[from] serde_json::Error), + + /// The configuration key was not found in Redis. + #[error("Config key not found in Redis")] + NotFound, - #[error("Internal error: {0}")] - Internal(String), + /// An I/O error occurred (e.g. reading config.json). + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + /// The configuration value was semantically invalid. #[error("Invalid configuration: {0}")] Invalid(String), } -impl IntoResponse for ConfigReloadError { +impl IntoResponse for ReloadError { fn into_response(self) -> axum::response::Response { - let (status, message) = match self { - ConfigReloadError::Io(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - ConfigReloadError::Serialization(_) => (StatusCode::BAD_REQUEST, self.to_string()), - ConfigReloadError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - ConfigReloadError::Invalid(_) => (StatusCode::BAD_REQUEST, self.to_string()), + let status = match self { + ReloadError::Invalid(_) | ReloadError::Deserialise(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, }; - - let body = Json(serde_json::json!({ - "error": message, - "status": status.as_u16() - })); - - (status, body).into_response() + (status, Json(serde_json::json!({ "error": self.to_string() }))).into_response() } } -/// Manages hot-reloadable application configuration. +// --------------------------------------------------------------------------- +// ConfigManager — ArcSwap-based, patch-capable +// --------------------------------------------------------------------------- + +/// Manages hot-reloadable application configuration via lock-free reads. +/// +/// Wrap in an [`Arc`] and share across Axum handlers via application state. pub struct ConfigManager { - current_config: ArcSwap, + current: ArcSwap, } impl ConfigManager { - /// Create a new ConfigManager with the default configuration. - pub fn new(initial_config: AppConfig) -> Self { + /// Create a new manager with the given initial configuration. + pub fn new(initial: AppConfig) -> Self { Self { - current_config: ArcSwap::from(Arc::new(initial_config)), + current: ArcSwap::from(Arc::new(initial)), } } - /// Get a reference to the current configuration. + /// Return a snapshot of the current configuration. + /// + /// This is a lock-free read — safe to call from hot paths. pub fn load(&self) -> Arc { - self.current_config.load_full() + self.current.load_full() } - /// Reload the configuration from a file or environment. - /// In this implementation, we simulate loading from a local `config.json` file. + /// Atomically replace the current configuration. + /// + /// Reads the JSON value from `config.json` in the current directory, + /// validates it, and swaps it in. #[instrument(skip(self))] - pub async fn reload(&self) -> Result<(), ConfigReloadError> { - info!("Starting configuration reload..."); - - // In a real scenario, we would load from a file or external service. - // For this task, we'll look for `config.json` in the current directory. - let config_path = "config.json"; - - if !std::path::Path::new(config_path).exists() { - warn!("config.json not found, skipping reload"); - return Err(ConfigReloadError::Io(std::io::Error::new( + pub async fn reload(&self) -> Result<(), ReloadError> { + info!("Starting configuration reload from config.json"); + + let path = "config.json"; + if !std::path::Path::new(path).exists() { + warn!("config.json not found, aborting reload"); + return Err(ReloadError::Io(std::io::Error::new( std::io::ErrorKind::NotFound, "config.json not found", ))); } - let content = tokio::fs::read_to_string(config_path).await?; + let content = tokio::fs::read_to_string(path).await?; let new_config: AppConfig = serde_json::from_str(&content)?; - // Validate config (e.g., check database URL format) if new_config.database.url.is_empty() { - return Err(ConfigReloadError::Invalid("Database URL cannot be empty".to_string())); + return Err(ReloadError::Invalid("database.url cannot be empty".into())); } - // Update the global configuration - self.current_config.store(Arc::new(new_config)); - - info!("Configuration successfully reloaded"); + self.current.store(Arc::new(new_config)); + info!("Configuration reloaded successfully"); Ok(()) } - /// Update configuration from a JSON value (e.g., from an API request). + /// Apply a partial JSON patch to the current configuration. + /// + /// Top-level and one-level-deep object keys are merged; all other values + /// are replaced. Returns an error if the result cannot be deserialised + /// into [`AppConfig`]. #[instrument(skip(self, patch))] - pub fn update_from_patch(&self, patch: Value) -> Result<(), ConfigReloadError> { + pub fn update_from_patch(&self, patch: Value) -> Result<(), ReloadError> { let current = self.load(); let mut current_json = serde_json::to_value(&*current)?; - - // Deep merge patch into current configuration - if let Some(patch_obj) = patch.as_object() { - if let Some(current_obj) = current_json.as_object_mut() { - for (k, v) in patch_obj { - if v.is_object() && current_obj.contains_key(k) && current_obj[k].is_object() { - // Merge nested objects - let sub_patch = v.as_object().unwrap(); - let sub_current = current_obj.get_mut(k).unwrap().as_object_mut().unwrap(); - for (sk, sv) in sub_patch { - sub_current.insert(sk.clone(), sv.clone()); + + if let (Some(patch_obj), Some(current_obj)) = + (patch.as_object(), current_json.as_object_mut()) + { + for (k, v) in patch_obj { + if v.is_object() { + if let Some(sub) = current_obj.get_mut(k).and_then(|s| s.as_object_mut()) { + for (sk, sv) in v.as_object().unwrap() { + sub.insert(sk.clone(), sv.clone()); } - } else { - // Direct replacement for non-objects or new keys - current_obj.insert(k.clone(), v.clone()); + continue; } } + current_obj.insert(k.clone(), v.clone()); } } let new_config: AppConfig = serde_json::from_value(current_json)?; - self.current_config.store(Arc::new(new_config)); - + self.current.store(Arc::new(new_config)); info!("Configuration updated via patch"); Ok(()) } } -/// Axum handler to trigger a configuration reload. -pub async fn handle_reload( - State(state): State>, -) -> Result { - state.config_manager.reload().await?; - Ok((StatusCode::OK, Json(serde_json::json!({ "status": "reloaded" })))) -} - -/// Axum handler to get the current configuration (sanitized). -pub async fn handle_get_config( - State(state): State>, -) -> impl IntoResponse { - let config = state.config_manager.load(); - // In a real app, we would sanitize sensitive fields like DB passwords - Json(config) -//! Configuration hot-reload. -//! -//! This module provides [`ConfigWatcher`], which holds the live [`AppConfig`] -//! behind an `Arc>` and can reload it at any time — either -//! programmatically via [`ConfigWatcher::reload`] or automatically by -//! subscribing to a Redis pub/sub channel with [`ConfigWatcher::watch`]. -//! -//! When a reload message arrives on the Redis channel the watcher fetches the -//! new configuration JSON from a Redis key, deserialises it, and atomically -//! swaps the in-memory value. All readers that hold a clone of the -//! [`ConfigHandle`] see the new values on their next read without any restart. -//! -//! # Example -//! -//! ```rust,no_run -//! use backend::config::reload::{AppConfig, ConfigWatcher}; -//! -//! # async fn example() -> anyhow::Result<()> { -//! let watcher = ConfigWatcher::new(AppConfig::default()); -//! let handle = watcher.handle(); -//! -//! // Read the current config -//! let cfg = handle.get().await; -//! println!("log level: {}", cfg.log_level); -//! -//! // Trigger a manual reload -//! watcher.reload(AppConfig { -//! log_level: "info".to_string(), -//! ..AppConfig::default() -//! }).await; -//! # Ok(()) -//! # } -//! ``` -//! -//! # Redis protocol -//! -//! Publish any non-empty string to `config:reload` to trigger a reload: -//! -//! ```text -//! PUBLISH config:reload "" -//! SET config:current '{"log_level":"info","max_connections":50,...}' -//! PUBLISH config:reload "reload" -//! ``` -//! -//! The watcher reads `config:current` from Redis after every message on -//! `config:reload`. If the key is absent or unparseable the existing config -//! is kept and an error is logged. - -#![allow(dead_code)] - -use std::sync::Arc; - -use redis::{AsyncCommands, Client as RedisClient}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::sync::{watch, RwLock}; -use tracing::{error, info, warn}; - // --------------------------------------------------------------------------- -// Error type -// --------------------------------------------------------------------------- - -/// Errors that can occur during configuration reload. -#[derive(Debug, Error)] -pub enum ReloadError { - /// A Redis error occurred. - #[error("Redis error: {0}")] - Redis(#[from] redis::RedisError), - - /// The configuration value could not be deserialised. - #[error("Config deserialisation error: {0}")] - Deserialise(#[from] serde_json::Error), - - /// The configuration key was not found in Redis. - #[error("Config key not found in Redis")] - NotFound, -} - -// --------------------------------------------------------------------------- -// AppConfig -// --------------------------------------------------------------------------- - -/// Live application configuration that can be hot-reloaded at runtime. -/// -/// All fields have sensible defaults so the application starts without any -/// external configuration source. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct AppConfig { - /// Tracing / log filter directive (e.g. `"backend=debug"`). - pub log_level: String, - /// Maximum number of database connections in the pool. - pub max_connections: u32, - /// Request timeout in seconds. - pub request_timeout_secs: u64, - /// Whether the maintenance mode banner is shown. - pub maintenance_mode: bool, - /// Redis key that stores the serialised [`AppConfig`] JSON. - pub redis_config_key: String, -} - -impl Default for AppConfig { - fn default() -> Self { - Self { - log_level: "backend=debug,tower_http=debug".to_string(), - max_connections: 10, - request_timeout_secs: 30, - maintenance_mode: false, - redis_config_key: "config:current".to_string(), - } - } -} - -// --------------------------------------------------------------------------- -// ConfigHandle — cheap clone, shared reader +// ConfigHandle — cheap clone, shared reader with change notification // --------------------------------------------------------------------------- /// A cheap-to-clone handle to the live configuration. @@ -268,7 +198,6 @@ impl Default for AppConfig { #[derive(Clone)] pub struct ConfigHandle { inner: Arc>, - /// Notified whenever the config is reloaded. changed: watch::Receiver<()>, } @@ -280,17 +209,18 @@ impl ConfigHandle { /// Wait until the configuration changes, then return the new snapshot. pub async fn wait_for_change(&mut self) -> AppConfig { - // `changed()` resolves immediately if there is an unseen change. let _ = self.changed.changed().await; self.get().await } } // --------------------------------------------------------------------------- -// ConfigWatcher +// ConfigWatcher — Redis pub/sub driven reload // --------------------------------------------------------------------------- -/// Owns the live [`AppConfig`] and drives hot-reload. +/// Owns the live [`AppConfig`] and drives hot-reload via Redis pub/sub. +/// +/// Wrap in an [`Arc`] to share across tasks. pub struct ConfigWatcher { inner: Arc>, notify_tx: watch::Sender<()>, @@ -317,6 +247,9 @@ impl ConfigWatcher { } /// Atomically replace the current configuration and notify all handles. + /// + /// If the new config is identical to the current one, no notification is + /// sent. pub async fn reload(&self, new_config: AppConfig) { let old = { let mut guard = self.inner.write().await; @@ -331,7 +264,6 @@ impl ConfigWatcher { maintenance_mode = new_config.maintenance_mode, "Configuration reloaded" ); - // Ignore send error — it only fails when all receivers are dropped. let _ = self.notify_tx.send(()); } else { info!("Configuration reload requested but values unchanged"); @@ -340,16 +272,17 @@ impl ConfigWatcher { /// Fetch the current configuration from Redis and apply it. /// - /// Reads the JSON value stored at `AppConfig::redis_config_key` (default - /// `config:current`), deserialises it, and calls [`Self::reload`]. + /// Reads the JSON value stored at the key `config:current`, deserialises + /// it, and calls [`Self::reload`]. /// /// # Errors + /// /// Returns [`ReloadError`] if the Redis key is absent, the connection /// fails, or the JSON cannot be deserialised. pub async fn reload_from_redis(&self, redis: &RedisClient) -> Result<(), ReloadError> { - let key = self.inner.read().await.redis_config_key.clone(); + const KEY: &str = "config:current"; let mut conn = redis.get_multiplexed_async_connection().await?; - let raw: Option = conn.get(&key).await?; + let raw: Option = conn.get(KEY).await?; let json = raw.ok_or(ReloadError::NotFound)?; let new_config: AppConfig = serde_json::from_str(&json)?; self.reload(new_config).await; @@ -359,14 +292,13 @@ impl ConfigWatcher { /// Spawn a background task that subscribes to `config:reload` on Redis /// and calls [`Self::reload_from_redis`] on every message. /// - /// The task runs until the Redis connection is lost or the process exits. + /// The task runs until the Redis pub/sub stream ends or the process exits. /// Connection errors are logged and the task exits — callers may restart /// it if desired. pub fn watch(self: Arc, redis: RedisClient) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { const CHANNEL: &str = "config:reload"; - // get_async_connection is the only way to obtain a PubSub-capable connection. #[allow(deprecated)] let conn = match redis.get_async_connection().await { Ok(c) => c, @@ -382,13 +314,10 @@ impl ConfigWatcher { return; } - info!( - channel = CHANNEL, - "Config watcher: listening for reload signals" - ); + info!(channel = CHANNEL, "Config watcher: listening for reload signals"); - let mut stream = pubsub.into_on_message(); use futures_util::StreamExt; + let mut stream = pubsub.into_on_message(); loop { match stream.next().await { @@ -409,6 +338,32 @@ impl ConfigWatcher { } } +// --------------------------------------------------------------------------- +// Axum handlers +// --------------------------------------------------------------------------- + +/// `POST /api/config/reload` — Reload configuration from `config.json`. +/// +/// Returns `200 OK` on success or an error response if the file is missing +/// or the JSON is invalid. +pub async fn handle_reload( + State(state): State>, +) -> Result { + state.config_manager.reload().await?; + Ok((StatusCode::OK, Json(serde_json::json!({ "status": "reloaded" })))) +} + +/// `GET /api/config` — Return the current configuration as JSON. +/// +/// Sensitive fields (e.g. database passwords embedded in URLs) are returned +/// as-is; callers should restrict access to this endpoint appropriately. +pub async fn handle_get_config( + State(state): State>, +) -> impl IntoResponse { + let config = state.config_manager.load(); + Json(config) +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -416,50 +371,66 @@ impl ConfigWatcher { #[cfg(test)] mod tests { use super::*; + use crate::config::AppConfig; - fn default_watcher() -> ConfigWatcher { - ConfigWatcher::new(AppConfig::default()) + // --- ConfigManager --- + + #[tokio::test] + async fn test_manager_load_returns_initial() { + let mgr = ConfigManager::new(AppConfig::default()); + assert_eq!(*mgr.load(), AppConfig::default()); } - // --- AppConfig --- + #[tokio::test] + async fn test_manager_reload_missing_file() { + let mgr = ConfigManager::new(AppConfig::default()); + let result = mgr.reload().await; + assert!(matches!(result, Err(ReloadError::Io(_)))); + // Config must be unchanged. + assert_eq!(*mgr.load(), AppConfig::default()); + } #[test] - fn test_default_config_values() { - let cfg = AppConfig::default(); - assert_eq!(cfg.max_connections, 10); - assert_eq!(cfg.request_timeout_secs, 30); - assert!(!cfg.maintenance_mode); - assert!(!cfg.log_level.is_empty()); - assert_eq!(cfg.redis_config_key, "config:current"); + fn test_manager_patch_top_level_field() { + let mgr = ConfigManager::new(AppConfig::default()); + mgr.update_from_patch(serde_json::json!({ "log_level": "warn" })) + .unwrap(); + assert_eq!(mgr.load().log_level, "warn"); } #[test] - fn test_config_serialisation_roundtrip() { - let cfg = AppConfig::default(); - let json = serde_json::to_string(&cfg).unwrap(); - let back: AppConfig = serde_json::from_str(&json).unwrap(); - assert_eq!(cfg, back); + fn test_manager_patch_nested_field() { + let mgr = ConfigManager::new(AppConfig::default()); + mgr.update_from_patch(serde_json::json!({ "server": { "port": 4000 } })) + .unwrap(); + let cfg = mgr.load(); + assert_eq!(cfg.server.port, 4000); + // Other nested fields preserved. + assert_eq!(cfg.server.host, "0.0.0.0"); } #[test] - fn test_config_partial_deserialisation() { - // Only some fields present — rest should use serde defaults. - let json = r#"{"log_level":"info","max_connections":25,"request_timeout_secs":60,"maintenance_mode":true,"redis_config_key":"config:current"}"#; - let cfg: AppConfig = serde_json::from_str(json).unwrap(); - assert_eq!(cfg.log_level, "info"); - assert_eq!(cfg.max_connections, 25); + fn test_manager_patch_preserves_unpatched_fields() { + let mgr = ConfigManager::new(AppConfig::default()); + mgr.update_from_patch(serde_json::json!({ "maintenance_mode": true })) + .unwrap(); + let cfg = mgr.load(); assert!(cfg.maintenance_mode); + assert_eq!(cfg.max_connections, 10); // unchanged } - // --- ConfigWatcher::reload --- + // --- ConfigWatcher --- + + fn default_watcher() -> ConfigWatcher { + ConfigWatcher::new(AppConfig::default()) + } #[tokio::test] - async fn test_reload_updates_config() { + async fn test_watcher_reload_updates_config() { let watcher = default_watcher(); let handle = watcher.handle(); let new_cfg = AppConfig { - log_level: "info".to_string(), max_connections: 50, ..AppConfig::default() }; @@ -469,78 +440,53 @@ mod tests { } #[tokio::test] - async fn test_reload_unchanged_does_not_notify() { + async fn test_watcher_reload_unchanged_no_notify() { let watcher = default_watcher(); let mut handle = watcher.handle(); - - // Mark the initial value as seen. handle.changed.borrow_and_update(); - // Reload with identical config. watcher.reload(AppConfig::default()).await; - // `has_changed` should be false — no notification was sent. assert!(!handle.changed.has_changed().unwrap()); } #[tokio::test] - async fn test_reload_changed_notifies_handle() { + async fn test_watcher_reload_changed_notifies() { let watcher = default_watcher(); let mut handle = watcher.handle(); - handle.changed.borrow_and_update(); watcher - .reload(AppConfig { - maintenance_mode: true, - ..AppConfig::default() - }) + .reload(AppConfig { maintenance_mode: true, ..AppConfig::default() }) .await; assert!(handle.changed.has_changed().unwrap()); } - // --- ConfigHandle --- - #[tokio::test] - async fn test_handle_get_returns_current() { - let watcher = default_watcher(); - let handle = watcher.handle(); - assert_eq!(handle.get().await, AppConfig::default()); - } - - #[tokio::test] - async fn test_multiple_handles_see_same_update() { + async fn test_watcher_multiple_handles_see_update() { let watcher = default_watcher(); let h1 = watcher.handle(); let h2 = watcher.handle(); - let new_cfg = AppConfig { - max_connections: 99, - ..AppConfig::default() - }; - watcher.reload(new_cfg.clone()).await; + watcher + .reload(AppConfig { max_connections: 99, ..AppConfig::default() }) + .await; assert_eq!(h1.get().await.max_connections, 99); assert_eq!(h2.get().await.max_connections, 99); } #[tokio::test] - async fn test_wait_for_change_resolves_after_reload() { + async fn test_watcher_wait_for_change() { let watcher = Arc::new(default_watcher()); let mut handle = watcher.handle(); - - // Mark current as seen so wait_for_change actually waits. handle.changed.borrow_and_update(); - let watcher2 = Arc::clone(&watcher); + let w2 = Arc::clone(&watcher); tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - watcher2 - .reload(AppConfig { - maintenance_mode: true, - ..AppConfig::default() - }) + w2.reload(AppConfig { maintenance_mode: true, ..AppConfig::default() }) .await; }); @@ -548,30 +494,54 @@ mod tests { assert!(updated.maintenance_mode); } - // --- reload_from_redis (no live Redis — error path) --- - #[tokio::test] - async fn test_reload_from_redis_connection_error() { + async fn test_watcher_reload_from_redis_connection_error() { let watcher = default_watcher(); - // Port 1 is never open — connection will fail immediately. + // Port 1 is never open. let redis = RedisClient::open("redis://127.0.0.1:1/").unwrap(); let result = watcher.reload_from_redis(&redis).await; assert!(matches!(result, Err(ReloadError::Redis(_)))); - // Config must be unchanged. assert_eq!(watcher.handle().get().await, AppConfig::default()); } - // --- ReloadError display --- + // --- ReloadError --- #[test] fn test_reload_error_not_found_display() { - let e = ReloadError::NotFound; - assert!(e.to_string().contains("not found")); + assert!(ReloadError::NotFound.to_string().contains("not found")); + } + + #[test] + fn test_reload_error_invalid_display() { + let e = ReloadError::Invalid("bad value".into()); + assert!(e.to_string().contains("bad value")); } #[test] fn test_reload_error_deserialise_display() { - let e = ReloadError::Deserialise(serde_json::from_str::("bad").unwrap_err()); + let inner = serde_json::from_str::("not json").unwrap_err(); + let e = ReloadError::Deserialise(inner); assert!(!e.to_string().is_empty()); } + + // --- AppConfig --- + + #[test] + fn test_appconfig_default_values() { + let cfg = AppConfig::default(); + assert_eq!(cfg.max_connections, 10); + assert_eq!(cfg.request_timeout_secs, 30); + assert!(!cfg.maintenance_mode); + assert!(!cfg.log_level.is_empty()); + assert_eq!(cfg.server.port, 3000); + assert_eq!(cfg.server.host, "0.0.0.0"); + } + + #[test] + fn test_appconfig_serialisation_roundtrip() { + let cfg = AppConfig::default(); + let json = serde_json::to_string(&cfg).unwrap(); + let back: AppConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(cfg, back); + } } diff --git a/backend/src/error.rs b/backend/src/error.rs index 3781fa6..68f2f9e 100644 --- a/backend/src/error.rs +++ b/backend/src/error.rs @@ -8,20 +8,8 @@ use axum::{ response::{IntoResponse, Response}, Json, }; -use serde_json::json; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum AppError { - #[error("Database error: {0}")] - DatabaseError(#[from] sqlx::Error), - - #[error("Redis error: {0}")] - RedisError(#[from] redis::RedisError), - - #[error("Internal server error")] - InternalServerError, use serde::Serialize; +use thiserror::Error; /// Structured error response returned to API clients. #[derive(Debug, Serialize)] @@ -33,20 +21,7 @@ pub struct ErrorResponse { } /// Application-level error type that unifies all possible error sources. -/// -/// Each variant maps to an HTTP status code and produces a consistent -/// JSON error response via the [`IntoResponse`] implementation. -/// -/// # Examples -/// -/// ```rust,no_run -/// use crucible_backend::error::AppError; -/// -/// async fn handler() -> Result { -/// Err(AppError::NotFound("Contract not found".into())) -/// } -/// ``` -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Error)] pub enum AppError { /// 404 — The requested resource was not found. #[error("Not found: {0}")] @@ -83,57 +58,18 @@ pub enum AppError { /// 500 — A catch-all for unexpected internal errors. #[error("Internal error: {0}")] InternalError(String), -use serde_json::json; -use thiserror::Error; -use tracing::error; - -#[derive(Debug, Error)] -pub enum AppError { - #[error("Database error: {0}")] - Database(#[from] sqlx::Error), - - #[error("Redis error: {0}")] - Redis(#[from] redis::RedisError), - - #[error("Serialization error: {0}")] - Serialization(#[from] serde_json::Error), - - #[error("Internal server error")] - Internal, - - #[error("Not found: {0}")] - NotFound(String), - - #[error("Validation error: {0}")] - ValidationError(String), - #[error("Invalid request: {0}")] - BadRequest(String), - - #[error("Unauthorized")] - Unauthorized, + /// 502 — A Stellar network operation failed. #[error("Stellar operation failed: {0}")] StellarError(String), } impl IntoResponse for AppError { fn into_response(self) -> Response { - let (status, error_message) = match self { - AppError::DatabaseError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - AppError::RedisError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - AppError::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - AppError::NotFound(msg) => (StatusCode::NOT_FOUND, msg), - AppError::ValidationError(msg) => (StatusCode::BAD_REQUEST, msg), - }; - - let body = Json(json!({ - "error": error_message, let (status, code, message) = match &self { AppError::NotFound(msg) => (StatusCode::NOT_FOUND, "not_found", msg.clone()), AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, "bad_request", msg.clone()), - AppError::Unauthorized(msg) => { - (StatusCode::UNAUTHORIZED, "unauthorized", msg.clone()) - } + AppError::Unauthorized(msg) => (StatusCode::UNAUTHORIZED, "unauthorized", msg.clone()), AppError::Forbidden(msg) => (StatusCode::FORBIDDEN, "forbidden", msg.clone()), AppError::Conflict(msg) => (StatusCode::CONFLICT, "conflict", msg.clone()), AppError::ValidationError(msg) => { @@ -163,6 +99,14 @@ impl IntoResponse for AppError { "An internal error occurred".to_string(), ) } + AppError::StellarError(msg) => { + tracing::error!("Stellar error: {msg}"); + ( + StatusCode::BAD_GATEWAY, + "stellar_error", + "Failed to communicate with Stellar network".to_string(), + ) + } }; ( @@ -181,13 +125,13 @@ mod tests { use super::*; #[test] - fn test_not_found_error_display() { + fn test_not_found_display() { let err = AppError::NotFound("Contract not found".into()); assert_eq!(err.to_string(), "Not found: Contract not found"); } #[test] - fn test_bad_request_error_display() { + fn test_bad_request_display() { let err = AppError::BadRequest("Invalid address format".into()); assert_eq!(err.to_string(), "Bad request: Invalid address format"); } @@ -213,45 +157,5 @@ mod tests { let json = serde_json::to_string(&resp).unwrap(); assert!(json.contains("\"code\":\"not_found\"")); assert!(json.contains("\"message\":\"Resource not found\"")); - let (status, message) = match self { - AppError::Database(ref e) => { - error!("Database error occurred: {:?}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "A database error occurred".to_string(), - ) - } - AppError::Redis(ref e) => { - error!("Redis error occurred: {:?}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "A cache error occurred".to_string(), - ) - } - AppError::NotFound(msg) => (StatusCode::NOT_FOUND, msg), - AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg), - AppError::Unauthorized => (StatusCode::UNAUTHORIZED, "Unauthorized access".to_string()), - AppError::StellarError(msg) => { - error!("Stellar error: {}", msg); - ( - StatusCode::BAD_GATEWAY, - "Failed to communicate with Stellar network".to_string(), - ) - } - _ => { - error!("Internal error: {:?}", self); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "An internal server error occurred".to_string(), - ) - } - }; - - let body = Json(json!({ - "error": message, - "code": status.as_u16(), - })); - - (status, body).into_response() } } diff --git a/backend/src/jobs.rs b/backend/src/jobs.rs index 2468029..d9f530a 100644 --- a/backend/src/jobs.rs +++ b/backend/src/jobs.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use tracing::{info, instrument}; + use crate::services::tracing::TracingService; #[derive(Debug, Serialize, Deserialize)] @@ -8,17 +9,12 @@ pub struct TransactionMonitorJob { } /// Handler for monitoring Stellar transactions. -/// Returning () since Apalis 0.6 handlers can return (). -pub async fn monitor_transaction(job: TransactionMonitorJob) { #[instrument(skip_all, fields(job.name = "monitor_transaction", job.id = %job.tx_hash))] -pub async fn monitor_transaction( - job: TransactionMonitorJob, -) { +pub async fn monitor_transaction(job: TransactionMonitorJob) { let span = TracingService::job_span("monitor_transaction", &job.tx_hash); let _enter = span.enter(); - + info!("Monitoring Stellar transaction: {}", job.tx_hash); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - info!("Transaction monitoring completed: {}", job.tx_hash); } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index bea007e..9432258 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,14 +1,13 @@ -pub mod utils; pub mod api; pub mod config; pub mod db; pub mod error; pub mod jobs; pub mod services; -pub mod config; pub mod telemetry; +pub mod utils; + #[cfg(any(test, feature = "testutils"))] pub mod test_utils; -pub mod utils; pub use error::AppError; diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index c9ffa9b..3585ed5 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,9 +1,8 @@ -pub mod log_alerts; pub mod alerts; +pub mod business_metrics; pub mod error_recovery; pub mod feature_flags; pub mod log_aggregator; pub mod log_alerts; pub mod sys_metrics; -pub mod business_metrics; pub mod tracing; diff --git a/backend/src/telemetry.rs b/backend/src/telemetry.rs index 77009b8..5b212e9 100644 --- a/backend/src/telemetry.rs +++ b/backend/src/telemetry.rs @@ -1,7 +1,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; /// Initializes the tracing subscriber for observability. -/// Uses EnvFilter to allow dynamic log level control. pub fn init_telemetry() { tracing_subscriber::registry() .with(