diff --git a/src/cache.rs b/src/cache.rs index c41990f..52f0ff8 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,3 +1,4 @@ +use crate::metrics; use std::sync::Arc; use std::time::Duration; @@ -17,6 +18,10 @@ pub trait CacheBackend: Send + Sync { self.set(key, value).await; } async fn delete(&self, key: &str); + + fn entry_count(&self) -> u64 { + 0 + } } /// Type-erased cache handle, cheaply cloneable. @@ -100,6 +105,10 @@ impl CacheBackend for MokaCache { async fn delete(&self, key: &str) { self.inner.invalidate(key); } + + fn entry_count(&self) -> u64 { + self.inner.entry_count() + } } // ─── Redis / Redis Sentinel ───────────────────────────────────────────────── @@ -204,9 +213,9 @@ impl RedisCache { } } - /// Attempt to reconnect via Sentinel (cold path, mutex-protected). async fn try_reconnect(&self) { if let Ok(mut sentinel) = self.sentinel.try_lock() { + metrics::global().redis_reconnections_total.inc(); match sentinel.get_async_connection().await { Ok(new_conn) => { self.conn.store(Arc::new(new_conn)); @@ -217,7 +226,6 @@ impl RedisCache { } } } - // If try_lock fails, another task is already reconnecting — skip. } } @@ -227,8 +235,19 @@ impl CacheBackend for RedisCache { let redis_key = self.prefixed(key); let mut conn = (*self.conn.load_full()).clone(); match redis::AsyncCommands::get::<_, Option>(&mut conn, &redis_key).await { - Ok(v) => v, + Ok(v) => { + metrics::global() + .redis_operations_total + .with_label_values(&["get", "ok"]) + .inc(); + v + } Err(e) => { + metrics::global() + .redis_operations_total + .with_label_values(&["get", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis GET failed, falling back to moka"); self.try_reconnect().await; self.fallback.get(key) @@ -242,8 +261,18 @@ impl CacheBackend for RedisCache { let res: Result<(), _> = redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, self.ttl_secs).await; if let Err(e) = res { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis SET failed, falling back to moka"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "ok"]) + .inc(); } // Always populate local fallback for graceful degradation. self.fallback.insert(key, value); @@ -256,8 +285,18 @@ impl CacheBackend for RedisCache { let res: Result<(), _> = redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, ttl_secs).await; if let Err(e) = res { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis SET failed, falling back to moka"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "ok"]) + .inc(); } // Always populate local fallback for graceful degradation. self.fallback.insert(key, value); @@ -267,11 +306,24 @@ impl CacheBackend for RedisCache { let redis_key = self.prefixed(key); let mut conn = (*self.conn.load_full()).clone(); if let Err(e) = redis::AsyncCommands::del::<_, ()>(&mut conn, &redis_key).await { + metrics::global() + .redis_operations_total + .with_label_values(&["del", "error"]) + .inc(); tracing::debug!(error = %e, key, "redis DEL failed"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["del", "ok"]) + .inc(); } self.fallback.invalidate(key); } + + fn entry_count(&self) -> u64 { + self.fallback.entry_count() + } } #[cfg(test)] diff --git a/src/circuit_breaker.rs b/src/circuit_breaker.rs index 2c82d89..76d86ed 100644 --- a/src/circuit_breaker.rs +++ b/src/circuit_breaker.rs @@ -64,6 +64,10 @@ impl CircuitBreaker { .circuit_breaker_state .with_label_values(&[project]) .set(STATE_HALF_OPEN as f64); + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, "open", "half_open"]) + .inc(); } true } else { @@ -79,7 +83,8 @@ impl CircuitBreaker { let health = self.project_health(project); health.consecutive_failures.store(0, Ordering::Release); - let mut current = health.state.load(Ordering::Acquire); + let previous = health.state.load(Ordering::Acquire); + let mut current = previous; while current != STATE_CLOSED { match health.state.compare_exchange( current, @@ -87,7 +92,13 @@ impl CircuitBreaker { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => break, + Ok(_) => { + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, state_name(current), "closed"]) + .inc(); + break; + } Err(actual) => current = actual, } } @@ -114,7 +125,13 @@ impl CircuitBreaker { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => break, + Ok(_) => { + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, state_name(current), "open"]) + .inc(); + break; + } Err(actual) => current = actual, } } @@ -134,6 +151,16 @@ impl CircuitBreaker { } } +#[inline] +fn state_name(state: u8) -> &'static str { + match state { + STATE_CLOSED => "closed", + STATE_OPEN => "open", + STATE_HALF_OPEN => "half_open", + _ => "unknown", + } +} + #[inline] fn now_epoch_secs() -> u64 { SystemTime::now() diff --git a/src/discovery.rs b/src/discovery.rs index 9723c74..47c0a6a 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -4,7 +4,7 @@ use arc_swap::ArcSwap; use secrecy::{ExposeSecret, SecretString}; use serde::Deserialize; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tracing::{debug, error, info}; const DISCOVERY_CACHE_KEY: &str = "discovery:projects"; @@ -187,8 +187,16 @@ impl Discoverer { } async fn refresh(&self) { + let start = Instant::now(); match self.fetch_proxy_cache_projects().await { Ok(projects) => { + let elapsed = start.elapsed().as_secs_f64(); + metrics::global().discovery_duration.observe(elapsed); + metrics::global().discovery_last_success_timestamp.set( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0.0, |d| d.as_secs_f64()), + ); let count = projects.len(); self.persist_to_cache(&projects).await; self.inner.projects.store(Arc::new(projects)); @@ -201,6 +209,9 @@ impl Discoverer { ); } Err(e) => { + let elapsed = start.elapsed().as_secs_f64(); + metrics::global().discovery_duration.observe(elapsed); + metrics::global().discovery_errors_total.inc(); error!( event = "discovery", error = %e, diff --git a/src/main.rs b/src/main.rs index be9d5ab..5c8d428 100644 --- a/src/main.rs +++ b/src/main.rs @@ -428,6 +428,7 @@ async fn rate_limit_middleware( match limiter.check_key(&ip) { Ok(_) => next.run(req).await, Err(_) => { + metrics::global().rate_limit_rejected_total.inc(); warn!( event = "rate_limit", client_ip = %ip, diff --git a/src/metrics.rs b/src/metrics.rs index c8505dd..f4007ea 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,8 +1,8 @@ use dashmap::DashMap; use prometheus::{ exponential_buckets, register_counter, register_counter_vec, register_gauge, - register_gauge_vec, register_histogram_vec, Counter, CounterVec, Gauge, GaugeVec, HistogramVec, - TextEncoder, + register_gauge_vec, register_histogram, register_histogram_vec, Counter, CounterVec, Gauge, + GaugeVec, Histogram, HistogramVec, TextEncoder, }; use std::sync::{Arc, OnceLock}; @@ -18,27 +18,87 @@ const MIN_REQUESTS_FOR_TOP_N: u64 = 10; const TOP_N_IMAGES: usize = 100; pub struct Metrics { + // ─── HTTP layer (proxy) ─────────────────────────────────────────────────── /// Total registry API requests by method / type / status. pub requests_total: CounterVec, + /// Overall HTTP request duration (all request types). + pub request_duration: HistogramVec, + /// Currently in-flight client requests. + pub inflight_requests: Gauge, + /// Response size in bytes (from Content-Length when available). + pub response_bytes_total: CounterVec, + /// Requests rejected by per-IP rate limiter. + pub rate_limit_rejected_total: Counter, + + // ─── Resolver ───────────────────────────────────────────────────────────── /// Manifest resolve duration: result label = "hit" | "miss" | "error". pub resolve_duration: HistogramVec, + /// Tags resolve duration: result = "ok" | "error". + pub tags_resolve_duration: HistogramVec, /// Cache lookup counter: result = "hit" | "miss". pub cache_lookups_total: CounterVec, - /// Current number of discovered proxy-cache projects. - pub discovered_projects: Gauge, - /// Requests sent to upstream Harbor projects. - pub upstream_requests_total: CounterVec, - /// Currently in-flight client requests. - pub inflight_requests: Gauge, + /// Negative cache hits (image not found served from cache). + pub negative_cache_hits_total: Counter, + /// Stale cache entries served while revalidating in background. + pub cache_stale_serves_total: Counter, + + // ─── Singleflight ───────────────────────────────────────────────────────── /// Requests deduplicated by the singleflight coalescer. pub singleflight_dedup_total: Counter, + /// Currently active singleflight groups (unique keys being resolved). + pub singleflight_inflight: Gauge, + /// Duration followers waited for the singleflight leader. + pub singleflight_wait_duration: Histogram, + + // ─── Upstream ───────────────────────────────────────────────────────────── + /// Requests sent to upstream Harbor projects. + pub upstream_requests_total: CounterVec, + /// Upstream request duration by project. + pub upstream_project_duration: HistogramVec, + /// Upstream connection-level errors by reason (timeout/connect/other). + pub upstream_connection_errors_total: CounterVec, /// Blob proxy duration: result = "ok" | "error" | "fallback". pub blob_proxy_duration: HistogramVec, - /// Upstream request duration by project: result = "ok" | "error". - pub upstream_project_duration: HistogramVec, + /// Blob HEAD probe outcomes: result = "found" | "not_found". + pub blob_probe_total: CounterVec, + /// Number of projects in parallel fanout per resolve. + pub fanout_size: Histogram, + + // ─── Discovery ──────────────────────────────────────────────────────────── + /// Current number of discovered proxy-cache projects. + pub discovered_projects: Gauge, + /// Total discovery refresh failures. + pub discovery_errors_total: Counter, + /// Epoch timestamp of last successful discovery refresh. + pub discovery_last_success_timestamp: Gauge, + /// Duration of discovery refresh cycle. + pub discovery_duration: Histogram, + + // ─── Circuit breaker ────────────────────────────────────────────────────── + /// Circuit breaker state by project: 0=closed, 1=open, 2=half-open. pub circuit_breaker_state: GaugeVec, + /// Circuit breaker state transitions. + pub circuit_breaker_transitions_total: CounterVec, + + // ─── Retries ────────────────────────────────────────────────────────────── pub retries_total: CounterVec, + // ─── Redis backend ──────────────────────────────────────────────────────── + /// Redis operations by type and result. + pub redis_operations_total: CounterVec, + /// Times local Moka fallback was used due to Redis failure. + pub redis_fallback_total: Counter, + /// Redis Sentinel reconnection attempts. + pub redis_reconnections_total: Counter, + + // ─── Cache backend ──────────────────────────────────────────────────────── + /// Current number of entries in local cache. + pub cache_entries: GaugeVec, + + // ─── Build info ─────────────────────────────────────────────────────────── + /// Build metadata (version, commit). Always 1. + pub build_info: GaugeVec, + // ─── Image popularity tracking (lock-free) ──────────────────────────────── /// Per-image manifest request counts. /// Key: "image:tag" or "image@sha256:..." @@ -57,94 +117,249 @@ pub struct Metrics { static METRICS: OnceLock = OnceLock::new(); pub fn global() -> &'static Metrics { - METRICS.get_or_init(|| Metrics { - requests_total: register_counter_vec!( - "harbor_router_requests_total", - "Total number of registry API requests.", - &["method", "type", "status"] - ) - .expect("register requests_total"), - - resolve_duration: register_histogram_vec!( - "harbor_router_resolve_duration_seconds", - "Duration of manifest resolution in seconds.", - &["result"], - exponential_buckets(0.005, 2.0, 14).expect("buckets") - ) - .expect("register resolve_duration"), - - cache_lookups_total: register_counter_vec!( - "harbor_router_cache_lookups_total", - "Total cache lookups by result.", - &["result"] - ) - .expect("register cache_lookups_total"), - - discovered_projects: register_gauge!( - "harbor_router_discovered_projects", - "Number of currently discovered proxy-cache projects." - ) - .expect("register discovered_projects"), - - upstream_requests_total: register_counter_vec!( - "harbor_router_upstream_requests_total", - "Total requests to upstream Harbor proxy-cache projects.", - &["project", "status"] - ) - .expect("register upstream_requests_total"), - - inflight_requests: register_gauge!( - "harbor_router_inflight_requests", - "Number of currently in-flight client requests." - ) - .expect("register inflight_requests"), - - singleflight_dedup_total: register_counter!( - "harbor_router_singleflight_dedup_total", - "Total number of requests deduplicated by singleflight." - ) - .expect("register singleflight_dedup_total"), - - blob_proxy_duration: register_histogram_vec!( - "harbor_router_blob_proxy_duration_seconds", - "Duration of blob proxy requests in seconds.", - &["result"], - exponential_buckets(0.01, 2.0, 14).expect("buckets") - ) - .expect("register blob_proxy_duration"), - - upstream_project_duration: register_histogram_vec!( - "harbor_router_upstream_project_duration_seconds", - "Duration of upstream requests by project", - &["project"], - exponential_buckets(0.005, 2.0, 14).expect("buckets") - ) - .expect("register upstream_project_duration"), - - circuit_breaker_state: register_gauge_vec!( - "harbor_router_circuit_breaker_state", - "Circuit breaker state by project: 0=closed, 1=open, 2=half-open.", - &["project"] - ) - .expect("register circuit_breaker_state"), - - retries_total: register_counter_vec!( - "harbor_router_retries_total", - "Total retry attempts by project and reason.", - &["project", "reason"] - ) - .expect("register retries_total"), - - // Image popularity tracking - image_manifest_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), - image_blob_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), - - image_requests_total: register_counter_vec!( - "harbor_router_image_requests_total", - "Total image requests by type (manifest/blob).", - &["type"] - ) - .expect("register image_requests_total"), + METRICS.get_or_init(|| { + let m = Metrics { + // HTTP layer + requests_total: register_counter_vec!( + "harbor_router_requests_total", + "Total number of registry API requests.", + &["method", "type", "status"] + ) + .expect("register requests_total"), + + request_duration: register_histogram_vec!( + "harbor_router_request_duration_seconds", + "Overall HTTP request duration in seconds.", + &["method", "req_type", "status_class"], + exponential_buckets(0.001, 2.0, 16).expect("buckets") + ) + .expect("register request_duration"), + + inflight_requests: register_gauge!( + "harbor_router_inflight_requests", + "Number of currently in-flight client requests." + ) + .expect("register inflight_requests"), + + response_bytes_total: register_counter_vec!( + "harbor_router_response_bytes_total", + "Total response bytes by request type (from Content-Length).", + &["req_type"] + ) + .expect("register response_bytes_total"), + + rate_limit_rejected_total: register_counter!( + "harbor_router_rate_limit_rejected_total", + "Total requests rejected by rate limiter." + ) + .expect("register rate_limit_rejected_total"), + + // Resolver + resolve_duration: register_histogram_vec!( + "harbor_router_resolve_duration_seconds", + "Duration of manifest resolution in seconds.", + &["result"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register resolve_duration"), + + tags_resolve_duration: register_histogram_vec!( + "harbor_router_tags_resolve_duration_seconds", + "Duration of tags resolution in seconds.", + &["result"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register tags_resolve_duration"), + + cache_lookups_total: register_counter_vec!( + "harbor_router_cache_lookups_total", + "Total cache lookups by result.", + &["result"] + ) + .expect("register cache_lookups_total"), + + negative_cache_hits_total: register_counter!( + "harbor_router_negative_cache_hits_total", + "Total negative cache hits (image not found served from cache)." + ) + .expect("register negative_cache_hits_total"), + + cache_stale_serves_total: register_counter!( + "harbor_router_cache_stale_serves_total", + "Total stale cache entries served while revalidating." + ) + .expect("register cache_stale_serves_total"), + + // Singleflight + singleflight_dedup_total: register_counter!( + "harbor_router_singleflight_dedup_total", + "Total number of requests deduplicated by singleflight." + ) + .expect("register singleflight_dedup_total"), + + singleflight_inflight: register_gauge!( + "harbor_router_singleflight_inflight", + "Number of currently active singleflight groups." + ) + .expect("register singleflight_inflight"), + + singleflight_wait_duration: register_histogram!( + "harbor_router_singleflight_wait_duration_seconds", + "Duration followers waited for singleflight leader.", + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register singleflight_wait_duration"), + + // Upstream + upstream_requests_total: register_counter_vec!( + "harbor_router_upstream_requests_total", + "Total requests to upstream Harbor proxy-cache projects.", + &["project", "status"] + ) + .expect("register upstream_requests_total"), + + upstream_project_duration: register_histogram_vec!( + "harbor_router_upstream_project_duration_seconds", + "Duration of upstream requests by project.", + &["project"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register upstream_project_duration"), + + upstream_connection_errors_total: register_counter_vec!( + "harbor_router_upstream_connection_errors_total", + "Total upstream connection-level errors by reason.", + &["reason"] + ) + .expect("register upstream_connection_errors_total"), + + blob_proxy_duration: register_histogram_vec!( + "harbor_router_blob_proxy_duration_seconds", + "Duration of blob proxy requests in seconds.", + &["result"], + exponential_buckets(0.01, 2.0, 14).expect("buckets") + ) + .expect("register blob_proxy_duration"), + + blob_probe_total: register_counter_vec!( + "harbor_router_blob_probe_total", + "Total blob HEAD probe outcomes.", + &["result"] + ) + .expect("register blob_probe_total"), + + fanout_size: register_histogram!( + "harbor_router_fanout_size", + "Number of projects in parallel fanout per resolve.", + vec![1.0, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0] + ) + .expect("register fanout_size"), + + // Discovery + discovered_projects: register_gauge!( + "harbor_router_discovered_projects", + "Number of currently discovered proxy-cache projects." + ) + .expect("register discovered_projects"), + + discovery_errors_total: register_counter!( + "harbor_router_discovery_errors_total", + "Total discovery refresh failures." + ) + .expect("register discovery_errors_total"), + + discovery_last_success_timestamp: register_gauge!( + "harbor_router_discovery_last_success_timestamp_seconds", + "Epoch timestamp of last successful discovery refresh." + ) + .expect("register discovery_last_success_timestamp"), + + discovery_duration: register_histogram!( + "harbor_router_discovery_duration_seconds", + "Duration of discovery refresh cycle.", + exponential_buckets(0.01, 2.0, 12).expect("buckets") + ) + .expect("register discovery_duration"), + + // Circuit breaker + circuit_breaker_state: register_gauge_vec!( + "harbor_router_circuit_breaker_state", + "Circuit breaker state by project: 0=closed, 1=open, 2=half-open.", + &["project"] + ) + .expect("register circuit_breaker_state"), + + circuit_breaker_transitions_total: register_counter_vec!( + "harbor_router_circuit_breaker_transitions_total", + "Total circuit breaker state transitions.", + &["project", "from", "to"] + ) + .expect("register circuit_breaker_transitions_total"), + + // Retries + retries_total: register_counter_vec!( + "harbor_router_retries_total", + "Total retry attempts by project and reason.", + &["project", "reason"] + ) + .expect("register retries_total"), + + // Redis backend + redis_operations_total: register_counter_vec!( + "harbor_router_redis_operations_total", + "Total Redis operations by type and result.", + &["operation", "result"] + ) + .expect("register redis_operations_total"), + + redis_fallback_total: register_counter!( + "harbor_router_redis_fallback_total", + "Total times local Moka fallback was used due to Redis failure." + ) + .expect("register redis_fallback_total"), + + redis_reconnections_total: register_counter!( + "harbor_router_redis_reconnections_total", + "Total Redis Sentinel reconnection attempts." + ) + .expect("register redis_reconnections_total"), + + // Cache backend + cache_entries: register_gauge_vec!( + "harbor_router_cache_entries", + "Current number of entries in cache.", + &["backend"] + ) + .expect("register cache_entries"), + + // Build info + build_info: register_gauge_vec!( + "harbor_router_build_info", + "Build information.", + &["version", "commit"] + ) + .expect("register build_info"), + + // Image popularity tracking + image_manifest_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), + image_blob_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), + + image_requests_total: register_counter_vec!( + "harbor_router_image_requests_total", + "Total image requests by type (manifest/blob).", + &["type"] + ) + .expect("register image_requests_total"), + }; + + m.build_info + .with_label_values(&[ + env!("CARGO_PKG_VERSION"), + option_env!("GIT_COMMIT_HASH").unwrap_or("unknown"), + ]) + .set(1.0); + + m }) } @@ -308,170 +523,247 @@ fn escape_label_value(s: &str) -> String { mod tests { use super::*; - #[test] - fn test_escape_label_value() { - assert_eq!(escape_label_value("simple"), "simple"); - assert_eq!(escape_label_value("with\"quote"), "with\\\"quote"); - assert_eq!(escape_label_value("with\\slash"), "with\\\\slash"); - assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline"); - } - - #[test] - fn test_record_manifest_request() { - let metrics = Metrics { + /// Creates a test `Metrics` instance with unique metric names to avoid + /// Prometheus global registry conflicts between tests. + fn test_metrics(prefix: &str) -> Metrics { + Metrics { + // HTTP layer requests_total: register_counter_vec!( - "test_manifest_requests_total", + format!("{}_requests_total", prefix), "test", &["method", "type", "status"] ) .unwrap(), + request_duration: register_histogram_vec!( + format!("{}_request_duration", prefix), + "test", + &["method", "req_type", "status_class"], + exponential_buckets(0.001, 2.0, 16).unwrap() + ) + .unwrap(), + inflight_requests: register_gauge!(format!("{}_inflight_requests", prefix), "test") + .unwrap(), + response_bytes_total: register_counter_vec!( + format!("{}_response_bytes_total", prefix), + "test", + &["req_type"] + ) + .unwrap(), + rate_limit_rejected_total: register_counter!( + format!("{}_rate_limit_rejected_total", prefix), + "test" + ) + .unwrap(), + + // Resolver resolve_duration: register_histogram_vec!( - "test_manifest_resolve_duration", + format!("{}_resolve_duration", prefix), + "test", + &["result"], + exponential_buckets(0.005, 2.0, 14).unwrap() + ) + .unwrap(), + tags_resolve_duration: register_histogram_vec!( + format!("{}_tags_resolve_duration", prefix), "test", &["result"], exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), cache_lookups_total: register_counter_vec!( - "test_manifest_cache_lookups", + format!("{}_cache_lookups", prefix), "test", &["result"] ) .unwrap(), - discovered_projects: register_gauge!("test_manifest_discovered_projects", "test") - .unwrap(), - upstream_requests_total: register_counter_vec!( - "test_manifest_upstream_requests", - "test", - &["project", "status"] + negative_cache_hits_total: register_counter!( + format!("{}_negative_cache_hits_total", prefix), + "test" ) .unwrap(), - inflight_requests: register_gauge!("test_manifest_inflight_requests", "test").unwrap(), - singleflight_dedup_total: register_counter!("test_manifest_singleflight_dedup", "test") - .unwrap(), - blob_proxy_duration: register_histogram_vec!( - "test_manifest_blob_proxy_duration", - "test", - &["result"], - exponential_buckets(0.01, 2.0, 14).unwrap() + cache_stale_serves_total: register_counter!( + format!("{}_cache_stale_serves_total", prefix), + "test" ) .unwrap(), - upstream_project_duration: register_histogram_vec!( - "test_manifest_upstream_project_duration", - "test", - &["project"], - exponential_buckets(0.005, 2.0, 14).unwrap() + + // Singleflight + singleflight_dedup_total: register_counter!( + format!("{}_singleflight_dedup", prefix), + "test" ) .unwrap(), - circuit_breaker_state: register_gauge_vec!( - "test_manifest_circuit_breaker_state", + singleflight_inflight: register_gauge!( + format!("{}_singleflight_inflight", prefix), + "test" + ) + .unwrap(), + singleflight_wait_duration: register_histogram!( + format!("{}_singleflight_wait_duration", prefix), "test", - &["project"] + exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), - retries_total: register_counter_vec!( - "test_manifest_retries_total", + + // Upstream + upstream_requests_total: register_counter_vec!( + format!("{}_upstream_requests", prefix), "test", - &["project", "reason"] + &["project", "status"] ) .unwrap(), - image_manifest_requests: Arc::new(DashMap::new()), - image_blob_requests: Arc::new(DashMap::new()), - image_requests_total: register_counter_vec!( - "test_manifest_image_requests", + upstream_project_duration: register_histogram_vec!( + format!("{}_upstream_project_duration", prefix), "test", - &["type"] + &["project"], + exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), - }; - - // Record some requests - metrics.record_manifest_request("nginx", "latest"); - metrics.record_manifest_request("nginx", "latest"); - metrics.record_manifest_request("redis", "7.0"); - - // Check counts - assert_eq!( - *metrics.image_manifest_requests.get("nginx:latest").unwrap(), - 2 - ); - assert_eq!( - *metrics.image_manifest_requests.get("redis:7.0").unwrap(), - 1 - ); - } - - #[test] - fn test_upstream_project_duration_histogram() { - let metrics = Metrics { - requests_total: register_counter_vec!( - "test_histogram_requests_total", + upstream_connection_errors_total: register_counter_vec!( + format!("{}_upstream_connection_errors_total", prefix), "test", - &["method", "type", "status"] + &["reason"] ) .unwrap(), - resolve_duration: register_histogram_vec!( - "test_histogram_resolve_duration", + blob_proxy_duration: register_histogram_vec!( + format!("{}_blob_proxy_duration", prefix), "test", &["result"], - exponential_buckets(0.005, 2.0, 14).unwrap() + exponential_buckets(0.01, 2.0, 14).unwrap() ) .unwrap(), - cache_lookups_total: register_counter_vec!( - "test_histogram_cache_lookups", + blob_probe_total: register_counter_vec!( + format!("{}_blob_probe_total", prefix), "test", &["result"] ) .unwrap(), - discovered_projects: register_gauge!("test_histogram_discovered_projects", "test") - .unwrap(), - upstream_requests_total: register_counter_vec!( - "test_histogram_upstream_requests", + fanout_size: register_histogram!( + format!("{}_fanout_size", prefix), "test", - &["project", "status"] + vec![1.0, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0] ) .unwrap(), - inflight_requests: register_gauge!("test_histogram_inflight_requests", "test").unwrap(), - singleflight_dedup_total: register_counter!( - "test_histogram_singleflight_dedup", + + // Discovery + discovered_projects: register_gauge!(format!("{}_discovered_projects", prefix), "test") + .unwrap(), + discovery_errors_total: register_counter!( + format!("{}_discovery_errors_total", prefix), "test" ) .unwrap(), - blob_proxy_duration: register_histogram_vec!( - "test_histogram_blob_proxy_duration", - "test", - &["result"], - exponential_buckets(0.01, 2.0, 14).unwrap() + discovery_last_success_timestamp: register_gauge!( + format!("{}_discovery_last_success_timestamp", prefix), + "test" ) .unwrap(), - upstream_project_duration: register_histogram_vec!( - "test_histogram_upstream_project_duration", + discovery_duration: register_histogram!( + format!("{}_discovery_duration", prefix), "test", - &["project"], - exponential_buckets(0.005, 2.0, 14).unwrap() + exponential_buckets(0.01, 2.0, 12).unwrap() ) .unwrap(), + + // Circuit breaker circuit_breaker_state: register_gauge_vec!( - "test_histogram_circuit_breaker_state", + format!("{}_circuit_breaker_state", prefix), "test", &["project"] ) .unwrap(), + circuit_breaker_transitions_total: register_counter_vec!( + format!("{}_circuit_breaker_transitions_total", prefix), + "test", + &["project", "from", "to"] + ) + .unwrap(), + + // Retries retries_total: register_counter_vec!( - "test_histogram_retries_total", + format!("{}_retries_total", prefix), "test", &["project", "reason"] ) .unwrap(), + + // Redis backend + redis_operations_total: register_counter_vec!( + format!("{}_redis_operations_total", prefix), + "test", + &["operation", "result"] + ) + .unwrap(), + redis_fallback_total: register_counter!( + format!("{}_redis_fallback_total", prefix), + "test" + ) + .unwrap(), + redis_reconnections_total: register_counter!( + format!("{}_redis_reconnections_total", prefix), + "test" + ) + .unwrap(), + + // Cache backend + cache_entries: register_gauge_vec!( + format!("{}_cache_entries", prefix), + "test", + &["backend"] + ) + .unwrap(), + + // Build info + build_info: register_gauge_vec!( + format!("{}_build_info", prefix), + "test", + &["version", "commit"] + ) + .unwrap(), + + // Image popularity tracking image_manifest_requests: Arc::new(DashMap::new()), image_blob_requests: Arc::new(DashMap::new()), image_requests_total: register_counter_vec!( - "test_histogram_image_requests", + format!("{}_image_requests", prefix), "test", &["type"] ) .unwrap(), - }; + } + } + + #[test] + fn test_escape_label_value() { + assert_eq!(escape_label_value("simple"), "simple"); + assert_eq!(escape_label_value("with\"quote"), "with\\\"quote"); + assert_eq!(escape_label_value("with\\slash"), "with\\\\slash"); + assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline"); + } + + #[test] + fn test_record_manifest_request() { + let metrics = test_metrics("test_manifest"); + + // Record some requests + metrics.record_manifest_request("nginx", "latest"); + metrics.record_manifest_request("nginx", "latest"); + metrics.record_manifest_request("redis", "7.0"); + + // Check counts + assert_eq!( + *metrics.image_manifest_requests.get("nginx:latest").unwrap(), + 2 + ); + assert_eq!( + *metrics.image_manifest_requests.get("redis:7.0").unwrap(), + 1 + ); + } + + #[test] + fn test_upstream_project_duration_histogram() { + let metrics = test_metrics("test_histogram"); // Record observations for different projects metrics diff --git a/src/proxy.rs b/src/proxy.rs index 6c03e40..c52cbdf 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -197,10 +197,15 @@ async fn handle_manifest( async fn handle_tags(state: &AppState, image: &str, auth: Option<&str>) -> Response { let start = Instant::now(); let result = state.resolver.resolve_tags(image, auth).await; - let duration_ms = start.elapsed().as_millis() as u64; + let elapsed = start.elapsed().as_secs_f64(); + let duration_ms = (elapsed * 1000.0) as u64; match result { Err(e) => { + metrics::global() + .tags_resolve_duration + .with_label_values(&["error"]) + .observe(elapsed); error!( event = "tags_resolve", image, @@ -216,6 +221,10 @@ async fn handle_tags(state: &AppState, image: &str, auth: Option<&str>) -> Respo ) } Ok(r) => { + metrics::global() + .tags_resolve_duration + .with_label_values(&["ok"]) + .observe(elapsed); info!( event = "tags_resolve", image, @@ -393,6 +402,10 @@ async fn probe_blob_project( let s = resp.status().as_u16(); // 200 = blob present; 307 = redirect to storage backend (also present). if s == 200 || s == 307 { + metrics::global() + .blob_probe_total + .with_label_values(&["found"]) + .inc(); return Ok(proj); } } @@ -400,6 +413,10 @@ async fn probe_blob_project( } } + metrics::global() + .blob_probe_total + .with_label_values(&["not_found"]) + .inc(); anyhow::bail!("blob {}/{} not found in any project", image, digest); } @@ -463,6 +480,19 @@ pub async fn logging_middleware(req: Request, next: Next) -> Response { .requests_total .with_label_values(&[method.as_str(), req_type, status_class]) .inc(); + metrics::global() + .request_duration + .with_label_values(&[method.as_str(), req_type, status_class]) + .observe(start.elapsed().as_secs_f64()); + + if let Some(cl) = response.headers().get("content-length") { + if let Ok(bytes) = cl.to_str().unwrap_or("0").parse::() { + metrics::global() + .response_bytes_total + .with_label_values(&[req_type]) + .inc_by(bytes); + } + } // Log level based on status and type // - 4xx/5xx → WARN (errors should be visible) diff --git a/src/resolver.rs b/src/resolver.rs index b280b30..20fd889 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -160,6 +160,7 @@ impl Resolver { ); if project == NEGATIVE_CACHE_SENTINEL { + metrics::global().negative_cache_hits_total.inc(); bail!( "image {}:{} not found in any proxy-cache project", image, @@ -179,6 +180,7 @@ impl Resolver { { Ok(r) if r.status == 200 => { if is_stale { + metrics::global().cache_stale_serves_total.inc(); let resolver = self.clone(); let image_owned = image.to_string(); let reference_owned = reference.to_string(); @@ -345,6 +347,7 @@ impl Resolver { if !is_leader { metrics::global().singleflight_dedup_total.inc(); + let wait_start = Instant::now(); debug!( event = "singleflight", image, @@ -357,6 +360,9 @@ impl Resolver { { let current = rx.borrow_and_update(); if let Some(ref result) = *current { + metrics::global() + .singleflight_wait_duration + .observe(wait_start.elapsed().as_secs_f64()); return result.clone().map_err(|e| anyhow!("{}", e)); } } @@ -366,6 +372,9 @@ impl Resolver { .await .map_err(|_| anyhow!("singleflight: follower timed out waiting for leader"))? .map_err(|_| anyhow!("singleflight: leader dropped channel"))?; + metrics::global() + .singleflight_wait_duration + .observe(wait_start.elapsed().as_secs_f64()); let result = rx.borrow().clone(); return result .ok_or_else(|| anyhow!("singleflight: leader sent empty result"))? @@ -373,10 +382,12 @@ impl Resolver { } // We are the leader — do the actual work. + metrics::global().singleflight_inflight.inc(); let res = self .parallel_lookup(image, reference, auth, accept) .await .map(Arc::new); + metrics::global().singleflight_inflight.dec(); // Publish result to waiters (watch retains value for late subscribers). let watch_val = res.as_ref().map(Arc::clone).map_err(|e| e.to_string()); @@ -480,6 +491,8 @@ impl Resolver { bail!("no available proxy-cache projects (all circuits open)"); } + metrics::global().fanout_size.observe(futures.len() as f64); + // Use FuturesUnordered via buffer_unordered to return as soon as the // first matching 200 response arrives, cancelling remaining futures. // If no content-type matches the client's Accept header, fall back to @@ -590,10 +603,20 @@ impl Resolver { } let start = std::time::Instant::now(); - let resp = req - .send() - .await - .map_err(|e| anyhow!("request to {}: {}", project, e))?; + let resp = req.send().await.map_err(|e| { + let reason = if e.is_timeout() { + "timeout" + } else if e.is_connect() { + "connect" + } else { + "other" + }; + metrics::global() + .upstream_connection_errors_total + .with_label_values(&[reason]) + .inc(); + anyhow!("request to {}: {}", project, e) + })?; let status = resp.status().as_u16(); metrics::global() @@ -861,6 +884,11 @@ impl Resolver { } pub async fn persist_hot_entries(&self) { + metrics::global() + .cache_entries + .with_label_values(&["local"]) + .set(self.cache.entry_count() as f64); + if self.cache_warmup_top_n == 0 { return; }