Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ pub struct PersistConfig {
/// In Compactor::compact_and_apply_background, how many updates to encode or
/// decode before voluntarily yielding the task.
pub compaction_yield_after_n_updates: usize,
/// In Compactor::compact_and_apply_background, maximum time (in milliseconds)
/// to spend processing a single chunk before returning to allow yielding.
/// This prevents long-running synchronous work from blocking the runtime.
pub compaction_yield_after_ms: u64,
/// Length of time after a writer's last operation after which the writer
/// may be expired.
pub writer_lease_duration: Duration,
Expand Down Expand Up @@ -175,6 +179,7 @@ impl PersistConfig {
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
compaction_yield_after_n_updates: 100_000,
compaction_yield_after_ms: 10,
writer_lease_duration: 60 * Duration::from_secs(60),
critical_downgrade_interval: Duration::from_secs(30),
isolated_runtime_worker_threads: num_cpus::get(),
Expand Down
3 changes: 3 additions & 0 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub struct CompactRes<T> {
pub struct CompactConfig {
pub(crate) compaction_memory_bound_bytes: usize,
pub(crate) compaction_yield_after_n_updates: usize,
pub(crate) compaction_yield_after_ms: u64,
pub(crate) version: semver::Version,
pub(crate) batch: BatchBuilderConfig,
pub(crate) fetch_config: FetchConfig,
Expand All @@ -99,6 +100,7 @@ impl CompactConfig {
CompactConfig {
compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
compaction_yield_after_ms: value.compaction_yield_after_ms,
version: value.build_version.clone(),
batch: BatchBuilderConfig::new(value, shard_id),
fetch_config: FetchConfig::from_persist_config(value),
Expand Down Expand Up @@ -1031,6 +1033,7 @@ where
.next_chunk(
cfg.compaction_yield_after_n_updates,
cfg.batch.blob_target_size - total_bytes,
Some(std::time::Duration::from_millis(cfg.compaction_yield_after_ms)),
)
.await?
else {
Expand Down
24 changes: 20 additions & 4 deletions src/persist-client/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use arrow::array::{Array, Int64Array};
Expand Down Expand Up @@ -680,23 +681,33 @@ where
Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
}

fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
fn chunk(&mut self, max_len: usize, max_bytes: usize, max_time: Option<Duration>) -> Option<Part> {
let Some(mut iter) = self.iter() else {
return None;
};

let parts = iter.parts.clone();
let start = Instant::now();

// Keep a running estimate of the size left in the budget, returning None once
// budget is 0.
// budget is 0 or time threshold is exceeded.
// Note that we can't use take_while here - that method drops the first non-matching
// element, but we want to leave any data that we don't return in state for future
// calls to `next`/`next_chunk`.
let mut budget = max_bytes;
let mut count = 0usize;
let iter = std::iter::from_fn(move || {
if budget == 0 {
return None;
}
// Check time threshold periodically (every 1000 iterations) to avoid
// excessive Instant::now() calls while still yielding reasonably often.
if let Some(max_time) = max_time {
count += 1;
if count % 1000 == 0 && start.elapsed() >= max_time {
return None;
}
}
let update @ (_, kv, _, _) = iter.next()?;
// Budget for the K/V size plus two 8-byte Codec64 values.
budget = budget.saturating_sub(kv_size(kv) + 16);
Expand All @@ -711,14 +722,19 @@ where
/// Wait until data is available, then return an iterator over the next
/// consolidated chunk of output. If this method returns `None`, that all the data has been
/// exhausted and the full consolidated dataset has been returned.
///
/// `max_time` limits how long the chunk processing can run before returning early
/// to allow yielding. This prevents long-running synchronous work from blocking
/// the runtime and causing heartbeat task delays.
pub(crate) async fn next_chunk(
&mut self,
max_len: usize,
max_bytes: usize,
max_time: Option<Duration>,
) -> anyhow::Result<Option<Part>> {
self.trim();
self.unblock_progress().await?;
Ok(self.chunk(max_len, max_bytes))
Ok(self.chunk(max_len, max_bytes, max_time))
}

/// The size of the data that we _might_ be holding concurrently in memory. While this is
Expand Down Expand Up @@ -1186,7 +1202,7 @@ mod tests {
let mut out = vec![];
loop {
consolidator.trim();
let Some(chunk) = consolidator.chunk(1000, 1000) else {
let Some(chunk) = consolidator.chunk(1000, 1000, None) else {
break;
};
if chunk.len() > 0 {
Expand Down
2 changes: 1 addition & 1 deletion src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ where
} = self;

let part = consolidator
.next_chunk(*max_len, *max_bytes)
.next_chunk(*max_len, *max_bytes, None)
.await
.expect("fetching a leased part")?;
let key_decoder = self
Expand Down