Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
676 changes: 676 additions & 0 deletions src/compaction.rs

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions src/job_store_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub struct LsmState {
pub total_l0_size: u64,
/// Total estimated size of all sorted runs in bytes.
pub total_sorted_run_size: u64,
/// Space amplification percentage: (non-bottom-run size / bottom-run size) * 100.
pub space_amplification_percent: f64,
}

/// Information about a single SST file.
Expand Down Expand Up @@ -295,6 +297,12 @@ impl JobStoreShard {
db_builder = db_builder.with_settings(settings);
}

// Use Silo's custom compaction scheduler with space amplification trigger
// to aggressively clean up tombstones from high-churn job lifecycle
db_builder = db_builder.with_compaction_scheduler_supplier(Arc::new(
crate::compaction::SiloCompactionSchedulerSupplier,
));

let db = db_builder.build().await?;
let db = Arc::new(db);
let concurrency = Arc::new(ConcurrencyManager::new());
Expand Down Expand Up @@ -512,12 +520,19 @@ impl JobStoreShard {

let total_l0_size: u64 = l0_ssts.iter().map(|s| s.estimated_size).sum();
let total_sorted_run_size: u64 = sorted_runs.iter().map(|s| s.estimated_size).sum();
let sr_sizes: Vec<u64> = sorted_runs.iter().map(|s| s.estimated_size).collect();
let space_amplification_percent =
crate::compaction::compute_space_amplification_percent_from_sizes(
total_l0_size,
&sr_sizes,
);

Ok(LsmState {
l0_ssts,
sorted_runs,
total_l0_size,
total_sorted_run_size,
space_amplification_percent,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod arrow_ipc;
pub mod cluster_client;
pub mod cluster_query;
pub mod codec;
pub mod compaction;
pub mod concurrency;
pub mod coordination;
pub mod dst_events;
Expand Down
61 changes: 61 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ pub struct Metrics {
slatedb_running_compactions: GaugeVec,
slatedb_last_compaction_ts_sec: GaugeVec,

// Compaction health gauges (per-shard, updated periodically)
shard_l0_sst_count: GaugeVec,
shard_sorted_run_count: GaugeVec,
shard_space_amplification_percent: GaugeVec,

/// Tracks previous SlateDB counter values per (stat_name, shard) for delta computation.
/// SlateDB exposes counters as absolute values via `stat.get()`, but Prometheus counters
/// only support `inc_by(delta)`, so we compute deltas between polls.
Expand Down Expand Up @@ -217,6 +222,25 @@ impl Metrics {
self.concurrency_tickets_granted.inc();
}

/// Update compaction health metrics for a shard.
pub fn update_compaction_health(
&self,
shard: &str,
l0_count: usize,
sr_count: usize,
space_amp_percent: f64,
) {
self.shard_l0_sst_count
.with_label_values(&[shard])
.set(l0_count as f64);
self.shard_sorted_run_count
.with_label_values(&[shard])
.set(sr_count as f64);
self.shard_space_amplification_percent
.with_label_values(&[shard])
.set(space_amp_percent);
}

/// Update SlateDB storage metrics from a shard's StatRegistry.
///
/// Call this periodically (e.g., every second) to sync SlateDB's internal
Expand Down Expand Up @@ -628,6 +652,40 @@ pub fn init() -> anyhow::Result<Metrics> {
)?,
);

// Compaction health gauges
let shard_l0_sst_count = register(
&registry,
GaugeVec::new(
Opts::new(
"silo_shard_l0_sst_count",
"Number of L0 SSTs in the shard's LSM tree",
),
&["shard"],
)?,
);

let shard_sorted_run_count = register(
&registry,
GaugeVec::new(
Opts::new(
"silo_shard_sorted_run_count",
"Number of sorted runs in the shard's LSM tree",
),
&["shard"],
)?,
);

let shard_space_amplification_percent = register(
&registry,
GaugeVec::new(
Opts::new(
"silo_shard_space_amplification_percent",
"Space amplification percentage (non-bottom / bottom run size * 100)",
),
&["shard"],
)?,
);

Ok(Metrics {
registry: Arc::new(registry),
jobs_enqueued,
Expand Down Expand Up @@ -659,6 +717,9 @@ pub fn init() -> anyhow::Result<Metrics> {
slatedb_wal_buffer_estimated_bytes,
slatedb_running_compactions,
slatedb_last_compaction_ts_sec,
shard_l0_sst_count,
shard_sorted_run_count,
shard_space_amplification_percent,
slatedb_prev_values: Arc::new(Mutex::new(HashMap::new())),
})
}
Expand Down
27 changes: 27 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1969,10 +1969,12 @@ where
let reaper_metrics = metrics.clone();
let reaper: JoinHandle<()> = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
let mut tick_count: u64 = 0;
loop {
tokio::select! {
biased;
_ = interval.tick() => {
tick_count += 1;
let instances = reaper_factory.instances();

// Update shards open metric
Expand All @@ -1990,6 +1992,31 @@ where
m.update_slatedb_stats(&shard_id.to_string(), &stats);
}
}

// Every ~5s (50 ticks at 100ms), read LSM state for compaction health metrics
if tick_count % 50 == 0
&& let Some(ref m) = reaper_metrics
{
for (shard_id, shard) in instances.iter() {
match shard.read_lsm_state().await {
Ok(lsm) => {
m.update_compaction_health(
&shard_id.to_string(),
lsm.l0_ssts.len(),
lsm.sorted_runs.len(),
lsm.space_amplification_percent,
);
}
Err(e) => {
tracing::debug!(
shard = %shard_id,
error = %e,
"failed to read LSM state for compaction health metrics"
);
}
}
}
}
}
_ = tick_rx.recv() => { break; }
}
Expand Down
2 changes: 2 additions & 0 deletions src/webui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ struct LsmStateView {
total_sorted_run_size: String,
total_size: String,
sorted_runs: Vec<SortedRunView>,
space_amplification_percent: f64,
}

struct SortedRunView {
Expand Down Expand Up @@ -1949,6 +1950,7 @@ async fn shard_handler(
estimated_size: format_byte_size(sr.estimated_size),
})
.collect(),
space_amplification_percent: lsm.space_amplification_percent,
}),
Err(e) => {
warn!(shard_id = %params.id, error = %e, "failed to read LSM state");
Expand Down
31 changes: 28 additions & 3 deletions templates/shard.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ <h2 class="text-sm font-semibold text-zinc-300">Storage (LSM Tree)</h2>
</div>
<div class="p-4 space-y-4">
<!-- Summary -->
<dl class="grid grid-cols-2 md:grid-cols-4 gap-4">
<dl class="grid grid-cols-2 md:grid-cols-5 gap-4">
<div>
<dt class="text-xs text-zinc-500 uppercase tracking-wider">L0 SSTs</dt>
<dd class="text-sm text-zinc-200 mt-1">{{ lsm.l0_sst_count }}</dd>
Expand All @@ -116,6 +116,12 @@ <h2 class="text-sm font-semibold text-zinc-300">Storage (LSM Tree)</h2>
<dt class="text-xs text-zinc-500 uppercase tracking-wider">Sorted Run Size</dt>
<dd class="text-sm text-zinc-200 mt-1">{{ lsm.total_sorted_run_size }}</dd>
</div>
<div>
<dt class="text-xs text-zinc-500 uppercase tracking-wider">Space Amp</dt>
<dd class="text-sm mt-1 {% if lsm.space_amplification_percent > 25.0 %}text-red-400{% else if lsm.space_amplification_percent > 15.0 %}text-amber-400{% else %}text-green-400{% endif %}">
{{ lsm.space_amplification_percent|fmt("{:.1}") }}%
</dd>
</div>
</dl>

{% if !lsm.sorted_runs.is_empty() %}
Expand Down Expand Up @@ -144,14 +150,33 @@ <h3 class="text-xs text-zinc-500 uppercase tracking-wider mb-2">Sorted Runs</h3>
{% endif %}

<!-- Compaction hints -->
{% if lsm.l0_sst_count > 4 || lsm.sorted_run_count > 1 %}
{% if lsm.l0_sst_count > 8 || lsm.sorted_run_count > 3 || lsm.space_amplification_percent > 25.0 %}
<div class="bg-red-900/20 border border-red-700/50 px-3 py-2 text-xs text-red-300">
{% if lsm.l0_sst_count > 8 %}
Critical: Very high L0 SST count ({{ lsm.l0_sst_count }}). Compaction is significantly lagging.
{% endif %}
{% if lsm.sorted_run_count > 3 %}
Critical: Many sorted runs ({{ lsm.sorted_run_count }}) causing high read amplification.
{% endif %}
{% if lsm.space_amplification_percent > 25.0 %}
Critical: Space amplification ({{ lsm.space_amplification_percent|fmt("{:.1}") }}%) exceeds 25% threshold. Tombstones are accumulating.
{% endif %}
</div>
{% else if lsm.l0_sst_count > 4 || lsm.sorted_run_count > 1 || lsm.space_amplification_percent > 15.0 %}
<div class="bg-amber-900/20 border border-amber-700/50 px-3 py-2 text-xs text-amber-300">
{% if lsm.l0_sst_count > 4 %}
High L0 SST count ({{ lsm.l0_sst_count }}) may indicate compaction lag.
Moderate: L0 SST count ({{ lsm.l0_sst_count }}) is elevated. Compaction may be lagging.
{% endif %}
{% if lsm.sorted_run_count > 1 %}
Multiple sorted runs ({{ lsm.sorted_run_count }}) means read amplification; compaction will merge them.
{% endif %}
{% if lsm.space_amplification_percent > 15.0 %}
Moderate: Space amplification ({{ lsm.space_amplification_percent|fmt("{:.1}") }}%) is approaching the 25% threshold.
{% endif %}
</div>
{% else %}
<div class="bg-green-900/20 border border-green-700/50 px-3 py-2 text-xs text-green-300">
Compaction health is good.
</div>
{% endif %}

Expand Down
Loading
Loading