Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::metrics;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -17,6 +18,10 @@ pub trait CacheBackend: Send + Sync {
self.set(key, value).await;
}
async fn delete(&self, key: &str);

fn entry_count(&self) -> u64 {
0
}
}

/// Type-erased cache handle, cheaply cloneable.
Expand Down Expand Up @@ -100,6 +105,10 @@ impl CacheBackend for MokaCache {
async fn delete(&self, key: &str) {
self.inner.invalidate(key);
}

fn entry_count(&self) -> u64 {
self.inner.entry_count()
}
}

// ─── Redis / Redis Sentinel ─────────────────────────────────────────────────
Expand Down Expand Up @@ -204,9 +213,9 @@ impl RedisCache {
}
}

/// Attempt to reconnect via Sentinel (cold path, mutex-protected).
async fn try_reconnect(&self) {
if let Ok(mut sentinel) = self.sentinel.try_lock() {
metrics::global().redis_reconnections_total.inc();
match sentinel.get_async_connection().await {
Ok(new_conn) => {
self.conn.store(Arc::new(new_conn));
Expand All @@ -217,7 +226,6 @@ impl RedisCache {
}
}
}
// If try_lock fails, another task is already reconnecting — skip.
}
}

Expand All @@ -227,8 +235,19 @@ impl CacheBackend for RedisCache {
let redis_key = self.prefixed(key);
let mut conn = (*self.conn.load_full()).clone();
match redis::AsyncCommands::get::<_, Option<String>>(&mut conn, &redis_key).await {
Ok(v) => v,
Ok(v) => {
metrics::global()
.redis_operations_total
.with_label_values(&["get", "ok"])
.inc();
v
}
Err(e) => {
metrics::global()
.redis_operations_total
.with_label_values(&["get", "error"])
.inc();
metrics::global().redis_fallback_total.inc();
tracing::debug!(error = %e, key, "redis GET failed, falling back to moka");
self.try_reconnect().await;
self.fallback.get(key)
Expand All @@ -242,8 +261,18 @@ impl CacheBackend for RedisCache {
let res: Result<(), _> =
redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, self.ttl_secs).await;
if let Err(e) = res {
metrics::global()
.redis_operations_total
.with_label_values(&["set", "error"])
.inc();
metrics::global().redis_fallback_total.inc();
tracing::debug!(error = %e, key, "redis SET failed, falling back to moka");
self.try_reconnect().await;
} else {
metrics::global()
.redis_operations_total
.with_label_values(&["set", "ok"])
.inc();
}
// Always populate local fallback for graceful degradation.
self.fallback.insert(key, value);
Expand All @@ -256,8 +285,18 @@ impl CacheBackend for RedisCache {
let res: Result<(), _> =
redis::AsyncCommands::set_ex(&mut conn, &redis_key, &value, ttl_secs).await;
if let Err(e) = res {
metrics::global()
.redis_operations_total
.with_label_values(&["set", "error"])
.inc();
metrics::global().redis_fallback_total.inc();
tracing::debug!(error = %e, key, "redis SET failed, falling back to moka");
self.try_reconnect().await;
} else {
metrics::global()
.redis_operations_total
.with_label_values(&["set", "ok"])
.inc();
}
// Always populate local fallback for graceful degradation.
self.fallback.insert(key, value);
Expand All @@ -267,11 +306,24 @@ impl CacheBackend for RedisCache {
let redis_key = self.prefixed(key);
let mut conn = (*self.conn.load_full()).clone();
if let Err(e) = redis::AsyncCommands::del::<_, ()>(&mut conn, &redis_key).await {
metrics::global()
.redis_operations_total
.with_label_values(&["del", "error"])
.inc();
tracing::debug!(error = %e, key, "redis DEL failed");
self.try_reconnect().await;
} else {
metrics::global()
.redis_operations_total
.with_label_values(&["del", "ok"])
.inc();
}
self.fallback.invalidate(key);
}

fn entry_count(&self) -> u64 {
self.fallback.entry_count()
}
}

#[cfg(test)]
Expand Down
33 changes: 30 additions & 3 deletions src/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl CircuitBreaker {
.circuit_breaker_state
.with_label_values(&[project])
.set(STATE_HALF_OPEN as f64);
metrics::global()
.circuit_breaker_transitions_total
.with_label_values(&[project, "open", "half_open"])
.inc();
}
true
} else {
Expand All @@ -79,15 +83,22 @@ impl CircuitBreaker {
let health = self.project_health(project);
health.consecutive_failures.store(0, Ordering::Release);

let mut current = health.state.load(Ordering::Acquire);
let previous = health.state.load(Ordering::Acquire);
let mut current = previous;
while current != STATE_CLOSED {
match health.state.compare_exchange(
current,
STATE_CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Ok(_) => {
metrics::global()
.circuit_breaker_transitions_total
.with_label_values(&[project, state_name(current), "closed"])
.inc();
break;
}
Err(actual) => current = actual,
}
}
Expand All @@ -114,7 +125,13 @@ impl CircuitBreaker {
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Ok(_) => {
metrics::global()
.circuit_breaker_transitions_total
.with_label_values(&[project, state_name(current), "open"])
.inc();
break;
}
Err(actual) => current = actual,
}
}
Expand All @@ -134,6 +151,16 @@ impl CircuitBreaker {
}
}

#[inline]
fn state_name(state: u8) -> &'static str {
match state {
STATE_CLOSED => "closed",
STATE_OPEN => "open",
STATE_HALF_OPEN => "half_open",
_ => "unknown",
}
}

#[inline]
fn now_epoch_secs() -> u64 {
SystemTime::now()
Expand Down
13 changes: 12 additions & 1 deletion src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arc_swap::ArcSwap;
use secrecy::{ExposeSecret, SecretString};
use serde::Deserialize;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info};

const DISCOVERY_CACHE_KEY: &str = "discovery:projects";
Expand Down Expand Up @@ -187,8 +187,16 @@ impl Discoverer {
}

async fn refresh(&self) {
let start = Instant::now();
match self.fetch_proxy_cache_projects().await {
Ok(projects) => {
let elapsed = start.elapsed().as_secs_f64();
metrics::global().discovery_duration.observe(elapsed);
metrics::global().discovery_last_success_timestamp.set(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0.0, |d| d.as_secs_f64()),
);
let count = projects.len();
self.persist_to_cache(&projects).await;
self.inner.projects.store(Arc::new(projects));
Expand All @@ -201,6 +209,9 @@ impl Discoverer {
);
}
Err(e) => {
let elapsed = start.elapsed().as_secs_f64();
metrics::global().discovery_duration.observe(elapsed);
metrics::global().discovery_errors_total.inc();
error!(
event = "discovery",
error = %e,
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ async fn rate_limit_middleware(
match limiter.check_key(&ip) {
Ok(_) => next.run(req).await,
Err(_) => {
metrics::global().rate_limit_rejected_total.inc();
warn!(
event = "rate_limit",
client_ip = %ip,
Expand Down
Loading