diff --git a/Cargo.lock b/Cargo.lock index bc23678..2410ff1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5424,6 +5424,7 @@ dependencies = [ "serde_json", "silo-macros", "slatedb", + "slatedb-common", "storekey", "tempfile", "thiserror 1.0.69", @@ -5499,7 +5500,7 @@ checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "slatedb" version = "0.11.1" -source = "git+https://github.com/gadget-inc/slatedb.git?branch=gadget#91aca69be4cfe7d2642aace000d374d8f088a70f" +source = "git+https://github.com/gadget-inc/slatedb.git?branch=yo%2Frelax-wal-uri-check#ad443c2bd49e6145180b11aa52537414a0de77d4" dependencies = [ "anyhow", "async-trait", @@ -5545,7 +5546,7 @@ dependencies = [ [[package]] name = "slatedb-common" version = "0.11.1" -source = "git+https://github.com/gadget-inc/slatedb.git?branch=gadget#91aca69be4cfe7d2642aace000d374d8f088a70f" +source = "git+https://github.com/gadget-inc/slatedb.git?branch=yo%2Frelax-wal-uri-check#ad443c2bd49e6145180b11aa52537414a0de77d4" dependencies = [ "chrono", "log", @@ -5556,7 +5557,7 @@ dependencies = [ [[package]] name = "slatedb-txn-obj" version = "0.11.1" -source = "git+https://github.com/gadget-inc/slatedb.git?branch=gadget#91aca69be4cfe7d2642aace000d374d8f088a70f" +source = "git+https://github.com/gadget-inc/slatedb.git?branch=yo%2Frelax-wal-uri-check#ad443c2bd49e6145180b11aa52537414a0de77d4" dependencies = [ "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 1b060bb..0a4cc91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,8 @@ version = "0.2.0" [dependencies] clap = { version = "4.3.14", features = ["derive", "env"] } futures = "0.3" -slatedb = { git = "https://github.com/gadget-inc/slatedb.git", branch = "gadget" } +slatedb = { git = "https://github.com/gadget-inc/slatedb.git", branch = "yo/relax-wal-uri-check" } +slatedb-common = { git = "https://github.com/gadget-inc/slatedb.git", branch = "yo/relax-wal-uri-check" } object_store = { version = "0.12", features = ["gcp"] } tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] } serde = { version = "1.0", features = ["derive"] } diff --git a/src/job_store_shard/mod.rs b/src/job_store_shard/mod.rs index 42a7ead..60b0ca1 100644 --- a/src/job_store_shard/mod.rs +++ b/src/job_store_shard/mod.rs @@ -28,6 +28,7 @@ use helpers::WriteBatcher; pub use helpers::now_epoch_ms; use slatedb::Db; +use slatedb_common::metrics::DefaultMetricsRecorder; use std::sync::{Arc, OnceLock}; use std::time::Duration; use thiserror::Error; @@ -107,6 +108,8 @@ pub struct JobStoreShard { db_path: String, /// The shard's tenant range, immutable after opening. range: ShardRange, + /// Metrics recorder for SlateDB, passed to DbBuilder and used for stats collection. + slatedb_metrics_recorder: Arc, } #[derive(Debug, Error)] @@ -306,8 +309,10 @@ impl JobStoreShard { concurrency_reconcile_interval, } = options; + let slatedb_metrics_recorder = Arc::new(DefaultMetricsRecorder::new()); let mut db_builder = slatedb::DbBuilder::new(db_path, store.clone()) - .with_merge_operator(counters::counter_merge_operator()); + .with_merge_operator(counters::counter_merge_operator()) + .with_metrics_recorder(slatedb_metrics_recorder.clone()); // Configure separate WAL object store if provided if let Some(wal) = wal_store { @@ -380,6 +385,7 @@ impl JobStoreShard { store, db_path: db_path.to_string(), range: range.clone(), + slatedb_metrics_recorder, }); // Periodically reconcile pending concurrency requests to self-heal from @@ -524,10 +530,10 @@ impl JobStoreShard { &self.db } - /// Get the SlateDB metrics registry for this shard. + /// Get the SlateDB metrics recorder for this shard. /// Use this to collect storage-level statistics for observability. - pub fn slatedb_stats(&self) -> std::sync::Arc { - self.db.metrics() + pub fn slatedb_metrics_recorder(&self) -> &Arc { + &self.slatedb_metrics_recorder } /// Read LSM tree state from the SlateDB manifest. diff --git a/src/metrics.rs b/src/metrics.rs index ed5f80d..d006145 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -31,7 +31,7 @@ use prometheus::{ Counter, CounterVec, Encoder, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder, core::Collector, }; -use slatedb::stats::StatRegistry; +use slatedb_common::metrics::{DefaultMetricsRecorder, MetricValue}; use tokio::sync::broadcast; use tracing::{debug, error}; @@ -269,7 +269,7 @@ impl Metrics { /// Call this periodically (e.g., every second) to sync SlateDB's internal /// statistics to Prometheus metrics. Counter-type stats are tracked via deltas /// since Prometheus counters only support `inc_by()`, not `set()`. - pub fn update_slatedb_stats(&self, shard: &str, stats: &Arc) { + pub fn update_slatedb_stats(&self, shard: &str, recorder: &DefaultMetricsRecorder) { // Counter-type stats: monotonically increasing in SlateDB let counter_mappings: &[(&str, &CounterVec)] = &[ (slatedb::db_stats::GET_REQUESTS, &self.slatedb_get_requests), @@ -343,11 +343,18 @@ impl Metrics { ), ]; + let snapshot = recorder.snapshot(); + { let mut prev_values = self.slatedb_prev_values.lock().expect("lock poisoned"); for (stat_name, counter) in counter_mappings { - if let Some(stat) = stats.lookup(stat_name) { - let current = stat.get() as f64; + if let Some(metric) = snapshot.by_name(stat_name).first() { + let current = match &metric.value { + MetricValue::Counter(v) => *v as f64, + MetricValue::Gauge(v) => *v as f64, + MetricValue::UpDownCounter(v) => *v as f64, + MetricValue::Histogram { .. } => continue, + }; let key = (stat_name.to_string(), shard.to_string()); let prev = prev_values.get(&key).copied().unwrap_or(0.0); if current > prev { @@ -359,8 +366,14 @@ impl Metrics { } for (stat_name, gauge) in gauge_mappings { - if let Some(stat) = stats.lookup(stat_name) { - gauge.with_label_values(&[shard]).set(stat.get() as f64); + if let Some(metric) = snapshot.by_name(stat_name).first() { + let value = match &metric.value { + MetricValue::Counter(v) => *v as f64, + MetricValue::Gauge(v) => *v as f64, + MetricValue::UpDownCounter(v) => *v as f64, + MetricValue::Histogram { .. } => continue, + }; + gauge.with_label_values(&[shard]).set(value); } } } diff --git a/src/server.rs b/src/server.rs index 04ecdf6..181af7c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2018,8 +2018,8 @@ where // Collect SlateDB storage metrics for this shard if let Some(ref m) = reaper_metrics { - let stats = shard.slatedb_stats(); - m.update_slatedb_stats(&shard_id.to_string(), &stats); + let recorder = shard.slatedb_metrics_recorder(); + m.update_slatedb_stats(&shard_id.to_string(), recorder); } } } diff --git a/tests/metrics_tests.rs b/tests/metrics_tests.rs index b824225..99d363d 100644 --- a/tests/metrics_tests.rs +++ b/tests/metrics_tests.rs @@ -363,12 +363,15 @@ async fn test_metrics_all_recording_methods() { async fn test_metrics_slatedb_stats() { let (metrics, _app) = create_metrics_router(); - // Create a real SlateDB to get a StatRegistry with actual stats + // Create a real SlateDB with a metrics recorder let tmpdir = tempfile::tempdir().unwrap(); let object_store = std::sync::Arc::new( object_store::local::LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(), ); - let db = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store) + let recorder = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new()); + let db = slatedb::DbBuilder::new("test-db", object_store) + .with_metrics_recorder(recorder.clone()) + .build() .await .unwrap(); @@ -377,8 +380,7 @@ async fn test_metrics_slatedb_stats() { db.put(b"key2", b"value2").await.unwrap(); let _ = db.get(b"key1").await; - let stat_registry = db.metrics(); - metrics.update_slatedb_stats("0", &stat_registry); + metrics.update_slatedb_stats("0", &recorder); let body_str = gather_metrics_text(&metrics); @@ -443,14 +445,16 @@ async fn test_metrics_slatedb_counter_delta_tracking() { let object_store = std::sync::Arc::new( object_store::local::LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(), ); - let db = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store) + let recorder = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new()); + let db = slatedb::DbBuilder::new("test-db", object_store) + .with_metrics_recorder(recorder.clone()) + .build() .await .unwrap(); // First batch of writes db.put(b"key1", b"value1").await.unwrap(); - let stat_registry = db.metrics(); - metrics.update_slatedb_stats("0", &stat_registry); + metrics.update_slatedb_stats("0", &recorder); let body1 = gather_metrics_text(&metrics); let write_ops_1 = @@ -460,7 +464,7 @@ async fn test_metrics_slatedb_counter_delta_tracking() { // Second batch of writes — delta tracking should accumulate, not reset db.put(b"key2", b"value2").await.unwrap(); db.put(b"key3", b"value3").await.unwrap(); - metrics.update_slatedb_stats("0", &stat_registry); + metrics.update_slatedb_stats("0", &recorder); let body2 = gather_metrics_text(&metrics); let write_ops_2 = @@ -473,7 +477,7 @@ async fn test_metrics_slatedb_counter_delta_tracking() { ); // Calling update again without new writes should not change the counter - metrics.update_slatedb_stats("0", &stat_registry); + metrics.update_slatedb_stats("0", &recorder); let body3 = gather_metrics_text(&metrics); let write_ops_3 = extract_metric_value(&body3, &["silo_slatedb_write_ops_total", "shard=\"0\""]); @@ -493,7 +497,10 @@ async fn test_metrics_slatedb_multiple_shards() { let object_store1 = std::sync::Arc::new( object_store::local::LocalFileSystem::new_with_prefix(tmpdir1.path()).unwrap(), ); - let db1 = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store1) + let recorder1 = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new()); + let db1 = slatedb::DbBuilder::new("test-db", object_store1) + .with_metrics_recorder(recorder1.clone()) + .build() .await .unwrap(); @@ -501,7 +508,10 @@ async fn test_metrics_slatedb_multiple_shards() { let object_store2 = std::sync::Arc::new( object_store::local::LocalFileSystem::new_with_prefix(tmpdir2.path()).unwrap(), ); - let db2 = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store2) + let recorder2 = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new()); + let db2 = slatedb::DbBuilder::new("test-db", object_store2) + .with_metrics_recorder(recorder2.clone()) + .build() .await .unwrap(); @@ -510,8 +520,8 @@ async fn test_metrics_slatedb_multiple_shards() { db2.put(b"b", b"2").await.unwrap(); db2.put(b"c", b"3").await.unwrap(); - metrics.update_slatedb_stats("shard-a", &db1.metrics()); - metrics.update_slatedb_stats("shard-b", &db2.metrics()); + metrics.update_slatedb_stats("shard-a", &recorder1); + metrics.update_slatedb_stats("shard-b", &recorder2); let body = gather_metrics_text(&metrics); let shard_a_writes = extract_metric_value(