diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index d39caa9868b46..2701b478144c5 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -129,6 +129,10 @@ pub struct PersistConfig { /// In Compactor::compact_and_apply_background, how many updates to encode or /// decode before voluntarily yielding the task. pub compaction_yield_after_n_updates: usize, + /// In Compactor::compact_and_apply_background, maximum time (in milliseconds) + /// to spend processing a single chunk before returning to allow yielding. + /// This prevents long-running synchronous work from blocking the runtime. + pub compaction_yield_after_ms: u64, /// Length of time after a writer's last operation after which the writer /// may be expired. pub writer_lease_duration: Duration, @@ -175,6 +179,7 @@ impl PersistConfig { compaction_concurrency_limit: 5, compaction_queue_size: 20, compaction_yield_after_n_updates: 100_000, + compaction_yield_after_ms: 10, writer_lease_duration: 60 * Duration::from_secs(60), critical_downgrade_interval: Duration::from_secs(30), isolated_runtime_worker_threads: num_cpus::get(), diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index cf315bfc3c7f5..1deef33a87664 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -87,6 +87,7 @@ pub struct CompactRes { pub struct CompactConfig { pub(crate) compaction_memory_bound_bytes: usize, pub(crate) compaction_yield_after_n_updates: usize, + pub(crate) compaction_yield_after_ms: u64, pub(crate) version: semver::Version, pub(crate) batch: BatchBuilderConfig, pub(crate) fetch_config: FetchConfig, @@ -99,6 +100,7 @@ impl CompactConfig { CompactConfig { compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value), compaction_yield_after_n_updates: value.compaction_yield_after_n_updates, + compaction_yield_after_ms: value.compaction_yield_after_ms, version: value.build_version.clone(), batch: BatchBuilderConfig::new(value, shard_id), fetch_config: FetchConfig::from_persist_config(value), @@ -1031,6 +1033,7 @@ where .next_chunk( cfg.compaction_yield_after_n_updates, cfg.batch.blob_target_size - total_bytes, + Some(std::time::Duration::from_millis(cfg.compaction_yield_after_ms)), ) .await? else { diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 050062e4ec68a..afffb6ba26a82 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::mem; use std::sync::Arc; +use std::time::{Duration, Instant}; use anyhow::anyhow; use arrow::array::{Array, Int64Array}; @@ -680,23 +681,33 @@ where Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d)))) } - fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option { + fn chunk(&mut self, max_len: usize, max_bytes: usize, max_time: Option) -> Option { let Some(mut iter) = self.iter() else { return None; }; let parts = iter.parts.clone(); + let start = Instant::now(); // Keep a running estimate of the size left in the budget, returning None once - // budget is 0. + // budget is 0 or time threshold is exceeded. // Note that we can't use take_while here - that method drops the first non-matching // element, but we want to leave any data that we don't return in state for future // calls to `next`/`next_chunk`. let mut budget = max_bytes; + let mut count = 0usize; let iter = std::iter::from_fn(move || { if budget == 0 { return None; } + // Check time threshold periodically (every 1000 iterations) to avoid + // excessive Instant::now() calls while still yielding reasonably often. + if let Some(max_time) = max_time { + count += 1; + if count % 1000 == 0 && start.elapsed() >= max_time { + return None; + } + } let update @ (_, kv, _, _) = iter.next()?; // Budget for the K/V size plus two 8-byte Codec64 values. budget = budget.saturating_sub(kv_size(kv) + 16); @@ -711,14 +722,19 @@ where /// Wait until data is available, then return an iterator over the next /// consolidated chunk of output. If this method returns `None`, that all the data has been /// exhausted and the full consolidated dataset has been returned. + /// + /// `max_time` limits how long the chunk processing can run before returning early + /// to allow yielding. This prevents long-running synchronous work from blocking + /// the runtime and causing heartbeat task delays. pub(crate) async fn next_chunk( &mut self, max_len: usize, max_bytes: usize, + max_time: Option, ) -> anyhow::Result> { self.trim(); self.unblock_progress().await?; - Ok(self.chunk(max_len, max_bytes)) + Ok(self.chunk(max_len, max_bytes, max_time)) } /// The size of the data that we _might_ be holding concurrently in memory. While this is @@ -1186,7 +1202,7 @@ mod tests { let mut out = vec![]; loop { consolidator.trim(); - let Some(chunk) = consolidator.chunk(1000, 1000) else { + let Some(chunk) = consolidator.chunk(1000, 1000, None) else { break; }; if chunk.len() > 0 { diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 21ee955db815a..f9b062a00c284 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -937,7 +937,7 @@ where } = self; let part = consolidator - .next_chunk(*max_len, *max_bytes) + .next_chunk(*max_len, *max_bytes, None) .await .expect("fetching a leased part")?; let key_decoder = self