From 813652b5ecd3367673f62fcc7fe84722ad4c4aa3 Mon Sep 17 00:00:00 2001 From: Daniil Kivenko Date: Mon, 9 Mar 2026 16:48:46 +0000 Subject: [PATCH] feat(metrics): add 20 SRE-focused Prometheus metrics across all modules Instrument all modules with comprehensive observability metrics so on-call engineers can quickly diagnose production issues without guessing. New metrics cover: request duration histograms, response bytes, rate limit rejections, singleflight pressure, negative/stale cache hits, fanout size, upstream connection errors, blob probe outcomes, discovery health, circuit breaker transitions, Redis operation visibility, cache entry counts, and build info. Also refactors metrics test module with a shared constructor. --- src/cache.rs | 58 +++- src/circuit_breaker.rs | 33 +- src/discovery.rs | 13 +- src/main.rs | 1 + src/metrics.rs | 676 +++++++++++++++++++++++++++++------------ src/proxy.rs | 32 +- src/resolver.rs | 36 ++- 7 files changed, 645 insertions(+), 204 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index c41990f..52f0ff8 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,3 +1,4 @@ +use crate::metrics; use std::sync::Arc; use std::time::Duration; @@ -17,6 +18,10 @@ pub trait CacheBackend: Send + Sync { self.set(key, value).await; } async fn delete(&self, key: &str); + + fn entry_count(&self) -> u64 { + 0 + } } /// Type-erased cache handle, cheaply cloneable. @@ -100,6 +105,10 @@ impl CacheBackend for MokaCache { async fn delete(&self, key: &str) { self.inner.invalidate(key); } + + fn entry_count(&self) -> u64 { + self.inner.entry_count() + } } // ─── Redis / Redis Sentinel ───────────────────────────────────────────────── @@ -204,9 +213,9 @@ impl RedisCache { } } - /// Attempt to reconnect via Sentinel (cold path, mutex-protected). async fn try_reconnect(&self) { if let Ok(mut sentinel) = self.sentinel.try_lock() { + metrics::global().redis_reconnections_total.inc(); match sentinel.get_async_connection().await { Ok(new_conn) => { self.conn.store(Arc::new(new_conn)); @@ -217,7 +226,6 @@ impl RedisCache { } } } - // If try_lock fails, another task is already reconnecting — skip. } } @@ -227,8 +235,19 @@ impl CacheBackend for RedisCache { let redis_key = self.prefixed(key); let mut conn = (*self.conn.load_full()).clone(); match redis::AsyncCommands::get::<_, Option>(&mut conn, &redis_key).await { - Ok(v) => v, + Ok(v) => { + metrics::global() + .redis_operations_total + .with_label_values(&["get", "ok"]) + .inc(); + v + } Err(e) => { + metrics::global() + .redis_operations_total + .with_label_values(&["get", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis GET failed, falling back to moka"); self.try_reconnect().await; self.fallback.get(key) @@ -242,8 +261,18 @@ impl CacheBackend for RedisCache { let res: Result<(), _> = redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, self.ttl_secs).await; if let Err(e) = res { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis SET failed, falling back to moka"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "ok"]) + .inc(); } // Always populate local fallback for graceful degradation. self.fallback.insert(key, value); @@ -256,8 +285,18 @@ impl CacheBackend for RedisCache { let res: Result<(), _> = redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, ttl_secs).await; if let Err(e) = res { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "error"]) + .inc(); + metrics::global().redis_fallback_total.inc(); tracing::debug!(error = %e, key, "redis SET failed, falling back to moka"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["set", "ok"]) + .inc(); } // Always populate local fallback for graceful degradation. self.fallback.insert(key, value); @@ -267,11 +306,24 @@ impl CacheBackend for RedisCache { let redis_key = self.prefixed(key); let mut conn = (*self.conn.load_full()).clone(); if let Err(e) = redis::AsyncCommands::del::<_, ()>(&mut conn, &redis_key).await { + metrics::global() + .redis_operations_total + .with_label_values(&["del", "error"]) + .inc(); tracing::debug!(error = %e, key, "redis DEL failed"); self.try_reconnect().await; + } else { + metrics::global() + .redis_operations_total + .with_label_values(&["del", "ok"]) + .inc(); } self.fallback.invalidate(key); } + + fn entry_count(&self) -> u64 { + self.fallback.entry_count() + } } #[cfg(test)] diff --git a/src/circuit_breaker.rs b/src/circuit_breaker.rs index 2c82d89..76d86ed 100644 --- a/src/circuit_breaker.rs +++ b/src/circuit_breaker.rs @@ -64,6 +64,10 @@ impl CircuitBreaker { .circuit_breaker_state .with_label_values(&[project]) .set(STATE_HALF_OPEN as f64); + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, "open", "half_open"]) + .inc(); } true } else { @@ -79,7 +83,8 @@ impl CircuitBreaker { let health = self.project_health(project); health.consecutive_failures.store(0, Ordering::Release); - let mut current = health.state.load(Ordering::Acquire); + let previous = health.state.load(Ordering::Acquire); + let mut current = previous; while current != STATE_CLOSED { match health.state.compare_exchange( current, @@ -87,7 +92,13 @@ impl CircuitBreaker { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => break, + Ok(_) => { + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, state_name(current), "closed"]) + .inc(); + break; + } Err(actual) => current = actual, } } @@ -114,7 +125,13 @@ impl CircuitBreaker { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => break, + Ok(_) => { + metrics::global() + .circuit_breaker_transitions_total + .with_label_values(&[project, state_name(current), "open"]) + .inc(); + break; + } Err(actual) => current = actual, } } @@ -134,6 +151,16 @@ impl CircuitBreaker { } } +#[inline] +fn state_name(state: u8) -> &'static str { + match state { + STATE_CLOSED => "closed", + STATE_OPEN => "open", + STATE_HALF_OPEN => "half_open", + _ => "unknown", + } +} + #[inline] fn now_epoch_secs() -> u64 { SystemTime::now() diff --git a/src/discovery.rs b/src/discovery.rs index 9723c74..47c0a6a 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -4,7 +4,7 @@ use arc_swap::ArcSwap; use secrecy::{ExposeSecret, SecretString}; use serde::Deserialize; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tracing::{debug, error, info}; const DISCOVERY_CACHE_KEY: &str = "discovery:projects"; @@ -187,8 +187,16 @@ impl Discoverer { } async fn refresh(&self) { + let start = Instant::now(); match self.fetch_proxy_cache_projects().await { Ok(projects) => { + let elapsed = start.elapsed().as_secs_f64(); + metrics::global().discovery_duration.observe(elapsed); + metrics::global().discovery_last_success_timestamp.set( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0.0, |d| d.as_secs_f64()), + ); let count = projects.len(); self.persist_to_cache(&projects).await; self.inner.projects.store(Arc::new(projects)); @@ -201,6 +209,9 @@ impl Discoverer { ); } Err(e) => { + let elapsed = start.elapsed().as_secs_f64(); + metrics::global().discovery_duration.observe(elapsed); + metrics::global().discovery_errors_total.inc(); error!( event = "discovery", error = %e, diff --git a/src/main.rs b/src/main.rs index be9d5ab..5c8d428 100644 --- a/src/main.rs +++ b/src/main.rs @@ -428,6 +428,7 @@ async fn rate_limit_middleware( match limiter.check_key(&ip) { Ok(_) => next.run(req).await, Err(_) => { + metrics::global().rate_limit_rejected_total.inc(); warn!( event = "rate_limit", client_ip = %ip, diff --git a/src/metrics.rs b/src/metrics.rs index c8505dd..f4007ea 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,8 +1,8 @@ use dashmap::DashMap; use prometheus::{ exponential_buckets, register_counter, register_counter_vec, register_gauge, - register_gauge_vec, register_histogram_vec, Counter, CounterVec, Gauge, GaugeVec, HistogramVec, - TextEncoder, + register_gauge_vec, register_histogram, register_histogram_vec, Counter, CounterVec, Gauge, + GaugeVec, Histogram, HistogramVec, TextEncoder, }; use std::sync::{Arc, OnceLock}; @@ -18,27 +18,87 @@ const MIN_REQUESTS_FOR_TOP_N: u64 = 10; const TOP_N_IMAGES: usize = 100; pub struct Metrics { + // ─── HTTP layer (proxy) ─────────────────────────────────────────────────── /// Total registry API requests by method / type / status. pub requests_total: CounterVec, + /// Overall HTTP request duration (all request types). + pub request_duration: HistogramVec, + /// Currently in-flight client requests. + pub inflight_requests: Gauge, + /// Response size in bytes (from Content-Length when available). + pub response_bytes_total: CounterVec, + /// Requests rejected by per-IP rate limiter. + pub rate_limit_rejected_total: Counter, + + // ─── Resolver ───────────────────────────────────────────────────────────── /// Manifest resolve duration: result label = "hit" | "miss" | "error". pub resolve_duration: HistogramVec, + /// Tags resolve duration: result = "ok" | "error". + pub tags_resolve_duration: HistogramVec, /// Cache lookup counter: result = "hit" | "miss". pub cache_lookups_total: CounterVec, - /// Current number of discovered proxy-cache projects. - pub discovered_projects: Gauge, - /// Requests sent to upstream Harbor projects. - pub upstream_requests_total: CounterVec, - /// Currently in-flight client requests. - pub inflight_requests: Gauge, + /// Negative cache hits (image not found served from cache). + pub negative_cache_hits_total: Counter, + /// Stale cache entries served while revalidating in background. + pub cache_stale_serves_total: Counter, + + // ─── Singleflight ───────────────────────────────────────────────────────── /// Requests deduplicated by the singleflight coalescer. pub singleflight_dedup_total: Counter, + /// Currently active singleflight groups (unique keys being resolved). + pub singleflight_inflight: Gauge, + /// Duration followers waited for the singleflight leader. + pub singleflight_wait_duration: Histogram, + + // ─── Upstream ───────────────────────────────────────────────────────────── + /// Requests sent to upstream Harbor projects. + pub upstream_requests_total: CounterVec, + /// Upstream request duration by project. + pub upstream_project_duration: HistogramVec, + /// Upstream connection-level errors by reason (timeout/connect/other). + pub upstream_connection_errors_total: CounterVec, /// Blob proxy duration: result = "ok" | "error" | "fallback". pub blob_proxy_duration: HistogramVec, - /// Upstream request duration by project: result = "ok" | "error". - pub upstream_project_duration: HistogramVec, + /// Blob HEAD probe outcomes: result = "found" | "not_found". + pub blob_probe_total: CounterVec, + /// Number of projects in parallel fanout per resolve. + pub fanout_size: Histogram, + + // ─── Discovery ──────────────────────────────────────────────────────────── + /// Current number of discovered proxy-cache projects. + pub discovered_projects: Gauge, + /// Total discovery refresh failures. + pub discovery_errors_total: Counter, + /// Epoch timestamp of last successful discovery refresh. + pub discovery_last_success_timestamp: Gauge, + /// Duration of discovery refresh cycle. + pub discovery_duration: Histogram, + + // ─── Circuit breaker ────────────────────────────────────────────────────── + /// Circuit breaker state by project: 0=closed, 1=open, 2=half-open. pub circuit_breaker_state: GaugeVec, + /// Circuit breaker state transitions. + pub circuit_breaker_transitions_total: CounterVec, + + // ─── Retries ────────────────────────────────────────────────────────────── pub retries_total: CounterVec, + // ─── Redis backend ──────────────────────────────────────────────────────── + /// Redis operations by type and result. + pub redis_operations_total: CounterVec, + /// Times local Moka fallback was used due to Redis failure. + pub redis_fallback_total: Counter, + /// Redis Sentinel reconnection attempts. + pub redis_reconnections_total: Counter, + + // ─── Cache backend ──────────────────────────────────────────────────────── + /// Current number of entries in local cache. + pub cache_entries: GaugeVec, + + // ─── Build info ─────────────────────────────────────────────────────────── + /// Build metadata (version, commit). Always 1. + pub build_info: GaugeVec, + // ─── Image popularity tracking (lock-free) ──────────────────────────────── /// Per-image manifest request counts. /// Key: "image:tag" or "image@sha256:..." @@ -57,94 +117,249 @@ pub struct Metrics { static METRICS: OnceLock = OnceLock::new(); pub fn global() -> &'static Metrics { - METRICS.get_or_init(|| Metrics { - requests_total: register_counter_vec!( - "harbor_router_requests_total", - "Total number of registry API requests.", - &["method", "type", "status"] - ) - .expect("register requests_total"), - - resolve_duration: register_histogram_vec!( - "harbor_router_resolve_duration_seconds", - "Duration of manifest resolution in seconds.", - &["result"], - exponential_buckets(0.005, 2.0, 14).expect("buckets") - ) - .expect("register resolve_duration"), - - cache_lookups_total: register_counter_vec!( - "harbor_router_cache_lookups_total", - "Total cache lookups by result.", - &["result"] - ) - .expect("register cache_lookups_total"), - - discovered_projects: register_gauge!( - "harbor_router_discovered_projects", - "Number of currently discovered proxy-cache projects." - ) - .expect("register discovered_projects"), - - upstream_requests_total: register_counter_vec!( - "harbor_router_upstream_requests_total", - "Total requests to upstream Harbor proxy-cache projects.", - &["project", "status"] - ) - .expect("register upstream_requests_total"), - - inflight_requests: register_gauge!( - "harbor_router_inflight_requests", - "Number of currently in-flight client requests." - ) - .expect("register inflight_requests"), - - singleflight_dedup_total: register_counter!( - "harbor_router_singleflight_dedup_total", - "Total number of requests deduplicated by singleflight." - ) - .expect("register singleflight_dedup_total"), - - blob_proxy_duration: register_histogram_vec!( - "harbor_router_blob_proxy_duration_seconds", - "Duration of blob proxy requests in seconds.", - &["result"], - exponential_buckets(0.01, 2.0, 14).expect("buckets") - ) - .expect("register blob_proxy_duration"), - - upstream_project_duration: register_histogram_vec!( - "harbor_router_upstream_project_duration_seconds", - "Duration of upstream requests by project", - &["project"], - exponential_buckets(0.005, 2.0, 14).expect("buckets") - ) - .expect("register upstream_project_duration"), - - circuit_breaker_state: register_gauge_vec!( - "harbor_router_circuit_breaker_state", - "Circuit breaker state by project: 0=closed, 1=open, 2=half-open.", - &["project"] - ) - .expect("register circuit_breaker_state"), - - retries_total: register_counter_vec!( - "harbor_router_retries_total", - "Total retry attempts by project and reason.", - &["project", "reason"] - ) - .expect("register retries_total"), - - // Image popularity tracking - image_manifest_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), - image_blob_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), - - image_requests_total: register_counter_vec!( - "harbor_router_image_requests_total", - "Total image requests by type (manifest/blob).", - &["type"] - ) - .expect("register image_requests_total"), + METRICS.get_or_init(|| { + let m = Metrics { + // HTTP layer + requests_total: register_counter_vec!( + "harbor_router_requests_total", + "Total number of registry API requests.", + &["method", "type", "status"] + ) + .expect("register requests_total"), + + request_duration: register_histogram_vec!( + "harbor_router_request_duration_seconds", + "Overall HTTP request duration in seconds.", + &["method", "req_type", "status_class"], + exponential_buckets(0.001, 2.0, 16).expect("buckets") + ) + .expect("register request_duration"), + + inflight_requests: register_gauge!( + "harbor_router_inflight_requests", + "Number of currently in-flight client requests." + ) + .expect("register inflight_requests"), + + response_bytes_total: register_counter_vec!( + "harbor_router_response_bytes_total", + "Total response bytes by request type (from Content-Length).", + &["req_type"] + ) + .expect("register response_bytes_total"), + + rate_limit_rejected_total: register_counter!( + "harbor_router_rate_limit_rejected_total", + "Total requests rejected by rate limiter." + ) + .expect("register rate_limit_rejected_total"), + + // Resolver + resolve_duration: register_histogram_vec!( + "harbor_router_resolve_duration_seconds", + "Duration of manifest resolution in seconds.", + &["result"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register resolve_duration"), + + tags_resolve_duration: register_histogram_vec!( + "harbor_router_tags_resolve_duration_seconds", + "Duration of tags resolution in seconds.", + &["result"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register tags_resolve_duration"), + + cache_lookups_total: register_counter_vec!( + "harbor_router_cache_lookups_total", + "Total cache lookups by result.", + &["result"] + ) + .expect("register cache_lookups_total"), + + negative_cache_hits_total: register_counter!( + "harbor_router_negative_cache_hits_total", + "Total negative cache hits (image not found served from cache)." + ) + .expect("register negative_cache_hits_total"), + + cache_stale_serves_total: register_counter!( + "harbor_router_cache_stale_serves_total", + "Total stale cache entries served while revalidating." + ) + .expect("register cache_stale_serves_total"), + + // Singleflight + singleflight_dedup_total: register_counter!( + "harbor_router_singleflight_dedup_total", + "Total number of requests deduplicated by singleflight." + ) + .expect("register singleflight_dedup_total"), + + singleflight_inflight: register_gauge!( + "harbor_router_singleflight_inflight", + "Number of currently active singleflight groups." + ) + .expect("register singleflight_inflight"), + + singleflight_wait_duration: register_histogram!( + "harbor_router_singleflight_wait_duration_seconds", + "Duration followers waited for singleflight leader.", + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register singleflight_wait_duration"), + + // Upstream + upstream_requests_total: register_counter_vec!( + "harbor_router_upstream_requests_total", + "Total requests to upstream Harbor proxy-cache projects.", + &["project", "status"] + ) + .expect("register upstream_requests_total"), + + upstream_project_duration: register_histogram_vec!( + "harbor_router_upstream_project_duration_seconds", + "Duration of upstream requests by project.", + &["project"], + exponential_buckets(0.005, 2.0, 14).expect("buckets") + ) + .expect("register upstream_project_duration"), + + upstream_connection_errors_total: register_counter_vec!( + "harbor_router_upstream_connection_errors_total", + "Total upstream connection-level errors by reason.", + &["reason"] + ) + .expect("register upstream_connection_errors_total"), + + blob_proxy_duration: register_histogram_vec!( + "harbor_router_blob_proxy_duration_seconds", + "Duration of blob proxy requests in seconds.", + &["result"], + exponential_buckets(0.01, 2.0, 14).expect("buckets") + ) + .expect("register blob_proxy_duration"), + + blob_probe_total: register_counter_vec!( + "harbor_router_blob_probe_total", + "Total blob HEAD probe outcomes.", + &["result"] + ) + .expect("register blob_probe_total"), + + fanout_size: register_histogram!( + "harbor_router_fanout_size", + "Number of projects in parallel fanout per resolve.", + vec![1.0, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0] + ) + .expect("register fanout_size"), + + // Discovery + discovered_projects: register_gauge!( + "harbor_router_discovered_projects", + "Number of currently discovered proxy-cache projects." + ) + .expect("register discovered_projects"), + + discovery_errors_total: register_counter!( + "harbor_router_discovery_errors_total", + "Total discovery refresh failures." + ) + .expect("register discovery_errors_total"), + + discovery_last_success_timestamp: register_gauge!( + "harbor_router_discovery_last_success_timestamp_seconds", + "Epoch timestamp of last successful discovery refresh." + ) + .expect("register discovery_last_success_timestamp"), + + discovery_duration: register_histogram!( + "harbor_router_discovery_duration_seconds", + "Duration of discovery refresh cycle.", + exponential_buckets(0.01, 2.0, 12).expect("buckets") + ) + .expect("register discovery_duration"), + + // Circuit breaker + circuit_breaker_state: register_gauge_vec!( + "harbor_router_circuit_breaker_state", + "Circuit breaker state by project: 0=closed, 1=open, 2=half-open.", + &["project"] + ) + .expect("register circuit_breaker_state"), + + circuit_breaker_transitions_total: register_counter_vec!( + "harbor_router_circuit_breaker_transitions_total", + "Total circuit breaker state transitions.", + &["project", "from", "to"] + ) + .expect("register circuit_breaker_transitions_total"), + + // Retries + retries_total: register_counter_vec!( + "harbor_router_retries_total", + "Total retry attempts by project and reason.", + &["project", "reason"] + ) + .expect("register retries_total"), + + // Redis backend + redis_operations_total: register_counter_vec!( + "harbor_router_redis_operations_total", + "Total Redis operations by type and result.", + &["operation", "result"] + ) + .expect("register redis_operations_total"), + + redis_fallback_total: register_counter!( + "harbor_router_redis_fallback_total", + "Total times local Moka fallback was used due to Redis failure." + ) + .expect("register redis_fallback_total"), + + redis_reconnections_total: register_counter!( + "harbor_router_redis_reconnections_total", + "Total Redis Sentinel reconnection attempts." + ) + .expect("register redis_reconnections_total"), + + // Cache backend + cache_entries: register_gauge_vec!( + "harbor_router_cache_entries", + "Current number of entries in cache.", + &["backend"] + ) + .expect("register cache_entries"), + + // Build info + build_info: register_gauge_vec!( + "harbor_router_build_info", + "Build information.", + &["version", "commit"] + ) + .expect("register build_info"), + + // Image popularity tracking + image_manifest_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), + image_blob_requests: Arc::new(DashMap::with_capacity(MAX_TRACKED_IMAGES)), + + image_requests_total: register_counter_vec!( + "harbor_router_image_requests_total", + "Total image requests by type (manifest/blob).", + &["type"] + ) + .expect("register image_requests_total"), + }; + + m.build_info + .with_label_values(&[ + env!("CARGO_PKG_VERSION"), + option_env!("GIT_COMMIT_HASH").unwrap_or("unknown"), + ]) + .set(1.0); + + m }) } @@ -308,170 +523,247 @@ fn escape_label_value(s: &str) -> String { mod tests { use super::*; - #[test] - fn test_escape_label_value() { - assert_eq!(escape_label_value("simple"), "simple"); - assert_eq!(escape_label_value("with\"quote"), "with\\\"quote"); - assert_eq!(escape_label_value("with\\slash"), "with\\\\slash"); - assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline"); - } - - #[test] - fn test_record_manifest_request() { - let metrics = Metrics { + /// Creates a test `Metrics` instance with unique metric names to avoid + /// Prometheus global registry conflicts between tests. + fn test_metrics(prefix: &str) -> Metrics { + Metrics { + // HTTP layer requests_total: register_counter_vec!( - "test_manifest_requests_total", + format!("{}_requests_total", prefix), "test", &["method", "type", "status"] ) .unwrap(), + request_duration: register_histogram_vec!( + format!("{}_request_duration", prefix), + "test", + &["method", "req_type", "status_class"], + exponential_buckets(0.001, 2.0, 16).unwrap() + ) + .unwrap(), + inflight_requests: register_gauge!(format!("{}_inflight_requests", prefix), "test") + .unwrap(), + response_bytes_total: register_counter_vec!( + format!("{}_response_bytes_total", prefix), + "test", + &["req_type"] + ) + .unwrap(), + rate_limit_rejected_total: register_counter!( + format!("{}_rate_limit_rejected_total", prefix), + "test" + ) + .unwrap(), + + // Resolver resolve_duration: register_histogram_vec!( - "test_manifest_resolve_duration", + format!("{}_resolve_duration", prefix), + "test", + &["result"], + exponential_buckets(0.005, 2.0, 14).unwrap() + ) + .unwrap(), + tags_resolve_duration: register_histogram_vec!( + format!("{}_tags_resolve_duration", prefix), "test", &["result"], exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), cache_lookups_total: register_counter_vec!( - "test_manifest_cache_lookups", + format!("{}_cache_lookups", prefix), "test", &["result"] ) .unwrap(), - discovered_projects: register_gauge!("test_manifest_discovered_projects", "test") - .unwrap(), - upstream_requests_total: register_counter_vec!( - "test_manifest_upstream_requests", - "test", - &["project", "status"] + negative_cache_hits_total: register_counter!( + format!("{}_negative_cache_hits_total", prefix), + "test" ) .unwrap(), - inflight_requests: register_gauge!("test_manifest_inflight_requests", "test").unwrap(), - singleflight_dedup_total: register_counter!("test_manifest_singleflight_dedup", "test") - .unwrap(), - blob_proxy_duration: register_histogram_vec!( - "test_manifest_blob_proxy_duration", - "test", - &["result"], - exponential_buckets(0.01, 2.0, 14).unwrap() + cache_stale_serves_total: register_counter!( + format!("{}_cache_stale_serves_total", prefix), + "test" ) .unwrap(), - upstream_project_duration: register_histogram_vec!( - "test_manifest_upstream_project_duration", - "test", - &["project"], - exponential_buckets(0.005, 2.0, 14).unwrap() + + // Singleflight + singleflight_dedup_total: register_counter!( + format!("{}_singleflight_dedup", prefix), + "test" ) .unwrap(), - circuit_breaker_state: register_gauge_vec!( - "test_manifest_circuit_breaker_state", + singleflight_inflight: register_gauge!( + format!("{}_singleflight_inflight", prefix), + "test" + ) + .unwrap(), + singleflight_wait_duration: register_histogram!( + format!("{}_singleflight_wait_duration", prefix), "test", - &["project"] + exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), - retries_total: register_counter_vec!( - "test_manifest_retries_total", + + // Upstream + upstream_requests_total: register_counter_vec!( + format!("{}_upstream_requests", prefix), "test", - &["project", "reason"] + &["project", "status"] ) .unwrap(), - image_manifest_requests: Arc::new(DashMap::new()), - image_blob_requests: Arc::new(DashMap::new()), - image_requests_total: register_counter_vec!( - "test_manifest_image_requests", + upstream_project_duration: register_histogram_vec!( + format!("{}_upstream_project_duration", prefix), "test", - &["type"] + &["project"], + exponential_buckets(0.005, 2.0, 14).unwrap() ) .unwrap(), - }; - - // Record some requests - metrics.record_manifest_request("nginx", "latest"); - metrics.record_manifest_request("nginx", "latest"); - metrics.record_manifest_request("redis", "7.0"); - - // Check counts - assert_eq!( - *metrics.image_manifest_requests.get("nginx:latest").unwrap(), - 2 - ); - assert_eq!( - *metrics.image_manifest_requests.get("redis:7.0").unwrap(), - 1 - ); - } - - #[test] - fn test_upstream_project_duration_histogram() { - let metrics = Metrics { - requests_total: register_counter_vec!( - "test_histogram_requests_total", + upstream_connection_errors_total: register_counter_vec!( + format!("{}_upstream_connection_errors_total", prefix), "test", - &["method", "type", "status"] + &["reason"] ) .unwrap(), - resolve_duration: register_histogram_vec!( - "test_histogram_resolve_duration", + blob_proxy_duration: register_histogram_vec!( + format!("{}_blob_proxy_duration", prefix), "test", &["result"], - exponential_buckets(0.005, 2.0, 14).unwrap() + exponential_buckets(0.01, 2.0, 14).unwrap() ) .unwrap(), - cache_lookups_total: register_counter_vec!( - "test_histogram_cache_lookups", + blob_probe_total: register_counter_vec!( + format!("{}_blob_probe_total", prefix), "test", &["result"] ) .unwrap(), - discovered_projects: register_gauge!("test_histogram_discovered_projects", "test") - .unwrap(), - upstream_requests_total: register_counter_vec!( - "test_histogram_upstream_requests", + fanout_size: register_histogram!( + format!("{}_fanout_size", prefix), "test", - &["project", "status"] + vec![1.0, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0] ) .unwrap(), - inflight_requests: register_gauge!("test_histogram_inflight_requests", "test").unwrap(), - singleflight_dedup_total: register_counter!( - "test_histogram_singleflight_dedup", + + // Discovery + discovered_projects: register_gauge!(format!("{}_discovered_projects", prefix), "test") + .unwrap(), + discovery_errors_total: register_counter!( + format!("{}_discovery_errors_total", prefix), "test" ) .unwrap(), - blob_proxy_duration: register_histogram_vec!( - "test_histogram_blob_proxy_duration", - "test", - &["result"], - exponential_buckets(0.01, 2.0, 14).unwrap() + discovery_last_success_timestamp: register_gauge!( + format!("{}_discovery_last_success_timestamp", prefix), + "test" ) .unwrap(), - upstream_project_duration: register_histogram_vec!( - "test_histogram_upstream_project_duration", + discovery_duration: register_histogram!( + format!("{}_discovery_duration", prefix), "test", - &["project"], - exponential_buckets(0.005, 2.0, 14).unwrap() + exponential_buckets(0.01, 2.0, 12).unwrap() ) .unwrap(), + + // Circuit breaker circuit_breaker_state: register_gauge_vec!( - "test_histogram_circuit_breaker_state", + format!("{}_circuit_breaker_state", prefix), "test", &["project"] ) .unwrap(), + circuit_breaker_transitions_total: register_counter_vec!( + format!("{}_circuit_breaker_transitions_total", prefix), + "test", + &["project", "from", "to"] + ) + .unwrap(), + + // Retries retries_total: register_counter_vec!( - "test_histogram_retries_total", + format!("{}_retries_total", prefix), "test", &["project", "reason"] ) .unwrap(), + + // Redis backend + redis_operations_total: register_counter_vec!( + format!("{}_redis_operations_total", prefix), + "test", + &["operation", "result"] + ) + .unwrap(), + redis_fallback_total: register_counter!( + format!("{}_redis_fallback_total", prefix), + "test" + ) + .unwrap(), + redis_reconnections_total: register_counter!( + format!("{}_redis_reconnections_total", prefix), + "test" + ) + .unwrap(), + + // Cache backend + cache_entries: register_gauge_vec!( + format!("{}_cache_entries", prefix), + "test", + &["backend"] + ) + .unwrap(), + + // Build info + build_info: register_gauge_vec!( + format!("{}_build_info", prefix), + "test", + &["version", "commit"] + ) + .unwrap(), + + // Image popularity tracking image_manifest_requests: Arc::new(DashMap::new()), image_blob_requests: Arc::new(DashMap::new()), image_requests_total: register_counter_vec!( - "test_histogram_image_requests", + format!("{}_image_requests", prefix), "test", &["type"] ) .unwrap(), - }; + } + } + + #[test] + fn test_escape_label_value() { + assert_eq!(escape_label_value("simple"), "simple"); + assert_eq!(escape_label_value("with\"quote"), "with\\\"quote"); + assert_eq!(escape_label_value("with\\slash"), "with\\\\slash"); + assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline"); + } + + #[test] + fn test_record_manifest_request() { + let metrics = test_metrics("test_manifest"); + + // Record some requests + metrics.record_manifest_request("nginx", "latest"); + metrics.record_manifest_request("nginx", "latest"); + metrics.record_manifest_request("redis", "7.0"); + + // Check counts + assert_eq!( + *metrics.image_manifest_requests.get("nginx:latest").unwrap(), + 2 + ); + assert_eq!( + *metrics.image_manifest_requests.get("redis:7.0").unwrap(), + 1 + ); + } + + #[test] + fn test_upstream_project_duration_histogram() { + let metrics = test_metrics("test_histogram"); // Record observations for different projects metrics diff --git a/src/proxy.rs b/src/proxy.rs index 6c03e40..c52cbdf 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -197,10 +197,15 @@ async fn handle_manifest( async fn handle_tags(state: &AppState, image: &str, auth: Option<&str>) -> Response { let start = Instant::now(); let result = state.resolver.resolve_tags(image, auth).await; - let duration_ms = start.elapsed().as_millis() as u64; + let elapsed = start.elapsed().as_secs_f64(); + let duration_ms = (elapsed * 1000.0) as u64; match result { Err(e) => { + metrics::global() + .tags_resolve_duration + .with_label_values(&["error"]) + .observe(elapsed); error!( event = "tags_resolve", image, @@ -216,6 +221,10 @@ async fn handle_tags(state: &AppState, image: &str, auth: Option<&str>) -> Respo ) } Ok(r) => { + metrics::global() + .tags_resolve_duration + .with_label_values(&["ok"]) + .observe(elapsed); info!( event = "tags_resolve", image, @@ -393,6 +402,10 @@ async fn probe_blob_project( let s = resp.status().as_u16(); // 200 = blob present; 307 = redirect to storage backend (also present). if s == 200 || s == 307 { + metrics::global() + .blob_probe_total + .with_label_values(&["found"]) + .inc(); return Ok(proj); } } @@ -400,6 +413,10 @@ async fn probe_blob_project( } } + metrics::global() + .blob_probe_total + .with_label_values(&["not_found"]) + .inc(); anyhow::bail!("blob {}/{} not found in any project", image, digest); } @@ -463,6 +480,19 @@ pub async fn logging_middleware(req: Request, next: Next) -> Response { .requests_total .with_label_values(&[method.as_str(), req_type, status_class]) .inc(); + metrics::global() + .request_duration + .with_label_values(&[method.as_str(), req_type, status_class]) + .observe(start.elapsed().as_secs_f64()); + + if let Some(cl) = response.headers().get("content-length") { + if let Ok(bytes) = cl.to_str().unwrap_or("0").parse::() { + metrics::global() + .response_bytes_total + .with_label_values(&[req_type]) + .inc_by(bytes); + } + } // Log level based on status and type // - 4xx/5xx → WARN (errors should be visible) diff --git a/src/resolver.rs b/src/resolver.rs index b280b30..20fd889 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -160,6 +160,7 @@ impl Resolver { ); if project == NEGATIVE_CACHE_SENTINEL { + metrics::global().negative_cache_hits_total.inc(); bail!( "image {}:{} not found in any proxy-cache project", image, @@ -179,6 +180,7 @@ impl Resolver { { Ok(r) if r.status == 200 => { if is_stale { + metrics::global().cache_stale_serves_total.inc(); let resolver = self.clone(); let image_owned = image.to_string(); let reference_owned = reference.to_string(); @@ -345,6 +347,7 @@ impl Resolver { if !is_leader { metrics::global().singleflight_dedup_total.inc(); + let wait_start = Instant::now(); debug!( event = "singleflight", image, @@ -357,6 +360,9 @@ impl Resolver { { let current = rx.borrow_and_update(); if let Some(ref result) = *current { + metrics::global() + .singleflight_wait_duration + .observe(wait_start.elapsed().as_secs_f64()); return result.clone().map_err(|e| anyhow!("{}", e)); } } @@ -366,6 +372,9 @@ impl Resolver { .await .map_err(|_| anyhow!("singleflight: follower timed out waiting for leader"))? .map_err(|_| anyhow!("singleflight: leader dropped channel"))?; + metrics::global() + .singleflight_wait_duration + .observe(wait_start.elapsed().as_secs_f64()); let result = rx.borrow().clone(); return result .ok_or_else(|| anyhow!("singleflight: leader sent empty result"))? @@ -373,10 +382,12 @@ impl Resolver { } // We are the leader — do the actual work. + metrics::global().singleflight_inflight.inc(); let res = self .parallel_lookup(image, reference, auth, accept) .await .map(Arc::new); + metrics::global().singleflight_inflight.dec(); // Publish result to waiters (watch retains value for late subscribers). let watch_val = res.as_ref().map(Arc::clone).map_err(|e| e.to_string()); @@ -480,6 +491,8 @@ impl Resolver { bail!("no available proxy-cache projects (all circuits open)"); } + metrics::global().fanout_size.observe(futures.len() as f64); + // Use FuturesUnordered via buffer_unordered to return as soon as the // first matching 200 response arrives, cancelling remaining futures. // If no content-type matches the client's Accept header, fall back to @@ -590,10 +603,20 @@ impl Resolver { } let start = std::time::Instant::now(); - let resp = req - .send() - .await - .map_err(|e| anyhow!("request to {}: {}", project, e))?; + let resp = req.send().await.map_err(|e| { + let reason = if e.is_timeout() { + "timeout" + } else if e.is_connect() { + "connect" + } else { + "other" + }; + metrics::global() + .upstream_connection_errors_total + .with_label_values(&[reason]) + .inc(); + anyhow!("request to {}: {}", project, e) + })?; let status = resp.status().as_u16(); metrics::global() @@ -861,6 +884,11 @@ impl Resolver { } pub async fn persist_hot_entries(&self) { + metrics::global() + .cache_entries + .with_label_values(&["local"]) + .set(self.cache.entry_count() as f64); + if self.cache_warmup_top_n == 0 { return; }