Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ version = "0.2.0"
[dependencies]
clap = { version = "4.3.14", features = ["derive", "env"] }
futures = "0.3"
slatedb = { git = "https://github.com/gadget-inc/slatedb.git", branch = "gadget" }
slatedb = { git = "https://github.com/gadget-inc/slatedb.git", branch = "yo/relax-wal-uri-check" }
slatedb-common = { git = "https://github.com/gadget-inc/slatedb.git", branch = "yo/relax-wal-uri-check" }
object_store = { version = "0.12", features = ["gcp"] }
tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
14 changes: 10 additions & 4 deletions src/job_store_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use helpers::WriteBatcher;
pub use helpers::now_epoch_ms;

use slatedb::Db;
use slatedb_common::metrics::DefaultMetricsRecorder;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This due to slatedb/slatedb#1460

▎ If users now want metrics they must register their own recorder. They can follow the test examples and use the DefaultMetricsRecorder if they want a simple, lightweight metrics backend.

The design intent is that consumers like silo must pass a MetricsRecorder via DbBuilder::with_metrics_recorder() — there's no longer a built-in always-on recorder.

use std::sync::{Arc, OnceLock};
use std::time::Duration;
use thiserror::Error;
Expand Down Expand Up @@ -107,6 +108,8 @@ pub struct JobStoreShard {
db_path: String,
/// The shard's tenant range, immutable after opening.
range: ShardRange,
/// Metrics recorder for SlateDB, passed to DbBuilder and used for stats collection.
slatedb_metrics_recorder: Arc<DefaultMetricsRecorder>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -306,8 +309,10 @@ impl JobStoreShard {
concurrency_reconcile_interval,
} = options;

let slatedb_metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let mut db_builder = slatedb::DbBuilder::new(db_path, store.clone())
.with_merge_operator(counters::counter_merge_operator());
.with_merge_operator(counters::counter_merge_operator())
.with_metrics_recorder(slatedb_metrics_recorder.clone());

// Configure separate WAL object store if provided
if let Some(wal) = wal_store {
Expand Down Expand Up @@ -380,6 +385,7 @@ impl JobStoreShard {
store,
db_path: db_path.to_string(),
range: range.clone(),
slatedb_metrics_recorder,
});

// Periodically reconcile pending concurrency requests to self-heal from
Expand Down Expand Up @@ -524,10 +530,10 @@ impl JobStoreShard {
&self.db
}

/// Get the SlateDB metrics registry for this shard.
/// Get the SlateDB metrics recorder for this shard.
/// Use this to collect storage-level statistics for observability.
pub fn slatedb_stats(&self) -> std::sync::Arc<slatedb::stats::StatRegistry> {
self.db.metrics()
pub fn slatedb_metrics_recorder(&self) -> &Arc<DefaultMetricsRecorder> {
&self.slatedb_metrics_recorder
}

/// Read LSM tree state from the SlateDB manifest.
Expand Down
25 changes: 19 additions & 6 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use prometheus::{
Counter, CounterVec, Encoder, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
TextEncoder, core::Collector,
};
use slatedb::stats::StatRegistry;
use slatedb_common::metrics::{DefaultMetricsRecorder, MetricValue};
use tokio::sync::broadcast;
use tracing::{debug, error};

Expand Down Expand Up @@ -269,7 +269,7 @@ impl Metrics {
/// Call this periodically (e.g., every second) to sync SlateDB's internal
/// statistics to Prometheus metrics. Counter-type stats are tracked via deltas
/// since Prometheus counters only support `inc_by()`, not `set()`.
pub fn update_slatedb_stats(&self, shard: &str, stats: &Arc<StatRegistry>) {
pub fn update_slatedb_stats(&self, shard: &str, recorder: &DefaultMetricsRecorder) {
// Counter-type stats: monotonically increasing in SlateDB
let counter_mappings: &[(&str, &CounterVec)] = &[
(slatedb::db_stats::GET_REQUESTS, &self.slatedb_get_requests),
Expand Down Expand Up @@ -343,11 +343,18 @@ impl Metrics {
),
];

let snapshot = recorder.snapshot();

{
let mut prev_values = self.slatedb_prev_values.lock().expect("lock poisoned");
for (stat_name, counter) in counter_mappings {
if let Some(stat) = stats.lookup(stat_name) {
let current = stat.get() as f64;
if let Some(metric) = snapshot.by_name(stat_name).first() {
let current = match &metric.value {
MetricValue::Counter(v) => *v as f64,
MetricValue::Gauge(v) => *v as f64,
MetricValue::UpDownCounter(v) => *v as f64,
MetricValue::Histogram { .. } => continue,
};
let key = (stat_name.to_string(), shard.to_string());
let prev = prev_values.get(&key).copied().unwrap_or(0.0);
if current > prev {
Expand All @@ -359,8 +366,14 @@ impl Metrics {
}

for (stat_name, gauge) in gauge_mappings {
if let Some(stat) = stats.lookup(stat_name) {
gauge.with_label_values(&[shard]).set(stat.get() as f64);
if let Some(metric) = snapshot.by_name(stat_name).first() {
let value = match &metric.value {
MetricValue::Counter(v) => *v as f64,
MetricValue::Gauge(v) => *v as f64,
MetricValue::UpDownCounter(v) => *v as f64,
MetricValue::Histogram { .. } => continue,
};
gauge.with_label_values(&[shard]).set(value);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2018,8 +2018,8 @@ where

// Collect SlateDB storage metrics for this shard
if let Some(ref m) = reaper_metrics {
let stats = shard.slatedb_stats();
m.update_slatedb_stats(&shard_id.to_string(), &stats);
let recorder = shard.slatedb_metrics_recorder();
m.update_slatedb_stats(&shard_id.to_string(), recorder);
}
}
}
Expand Down
36 changes: 23 additions & 13 deletions tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,15 @@ async fn test_metrics_all_recording_methods() {
async fn test_metrics_slatedb_stats() {
let (metrics, _app) = create_metrics_router();

// Create a real SlateDB to get a StatRegistry with actual stats
// Create a real SlateDB with a metrics recorder
let tmpdir = tempfile::tempdir().unwrap();
let object_store = std::sync::Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(),
);
let db = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store)
let recorder = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new());
let db = slatedb::DbBuilder::new("test-db", object_store)
.with_metrics_recorder(recorder.clone())
.build()
.await
.unwrap();

Expand All @@ -377,8 +380,7 @@ async fn test_metrics_slatedb_stats() {
db.put(b"key2", b"value2").await.unwrap();
let _ = db.get(b"key1").await;

let stat_registry = db.metrics();
metrics.update_slatedb_stats("0", &stat_registry);
metrics.update_slatedb_stats("0", &recorder);

let body_str = gather_metrics_text(&metrics);

Expand Down Expand Up @@ -443,14 +445,16 @@ async fn test_metrics_slatedb_counter_delta_tracking() {
let object_store = std::sync::Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(),
);
let db = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store)
let recorder = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new());
let db = slatedb::DbBuilder::new("test-db", object_store)
.with_metrics_recorder(recorder.clone())
.build()
.await
.unwrap();

// First batch of writes
db.put(b"key1", b"value1").await.unwrap();
let stat_registry = db.metrics();
metrics.update_slatedb_stats("0", &stat_registry);
metrics.update_slatedb_stats("0", &recorder);

let body1 = gather_metrics_text(&metrics);
let write_ops_1 =
Expand All @@ -460,7 +464,7 @@ async fn test_metrics_slatedb_counter_delta_tracking() {
// Second batch of writes — delta tracking should accumulate, not reset
db.put(b"key2", b"value2").await.unwrap();
db.put(b"key3", b"value3").await.unwrap();
metrics.update_slatedb_stats("0", &stat_registry);
metrics.update_slatedb_stats("0", &recorder);

let body2 = gather_metrics_text(&metrics);
let write_ops_2 =
Expand All @@ -473,7 +477,7 @@ async fn test_metrics_slatedb_counter_delta_tracking() {
);

// Calling update again without new writes should not change the counter
metrics.update_slatedb_stats("0", &stat_registry);
metrics.update_slatedb_stats("0", &recorder);
let body3 = gather_metrics_text(&metrics);
let write_ops_3 =
extract_metric_value(&body3, &["silo_slatedb_write_ops_total", "shard=\"0\""]);
Expand All @@ -493,15 +497,21 @@ async fn test_metrics_slatedb_multiple_shards() {
let object_store1 = std::sync::Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(tmpdir1.path()).unwrap(),
);
let db1 = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store1)
let recorder1 = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new());
let db1 = slatedb::DbBuilder::new("test-db", object_store1)
.with_metrics_recorder(recorder1.clone())
.build()
.await
.unwrap();

let tmpdir2 = tempfile::tempdir().unwrap();
let object_store2 = std::sync::Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(tmpdir2.path()).unwrap(),
);
let db2 = slatedb::Db::open(object_store::path::Path::from("test-db"), object_store2)
let recorder2 = std::sync::Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new());
let db2 = slatedb::DbBuilder::new("test-db", object_store2)
.with_metrics_recorder(recorder2.clone())
.build()
.await
.unwrap();

Expand All @@ -510,8 +520,8 @@ async fn test_metrics_slatedb_multiple_shards() {
db2.put(b"b", b"2").await.unwrap();
db2.put(b"c", b"3").await.unwrap();

metrics.update_slatedb_stats("shard-a", &db1.metrics());
metrics.update_slatedb_stats("shard-b", &db2.metrics());
metrics.update_slatedb_stats("shard-a", &recorder1);
metrics.update_slatedb_stats("shard-b", &recorder2);

let body = gather_metrics_text(&metrics);
let shard_a_writes = extract_metric_value(
Expand Down
Loading