diff --git a/Cargo.toml b/Cargo.toml index 775c20f..f1c3033 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["network-programming", "web-programming::http-server", "web-progra http = "1" http-body = "1" http-body-util = "0.1" -bytes = { version = "1.5", features = ["serde"] } +bytes = { version = "1.6", features = ["serde"] } # Tower service abstraction tower = { version = "0.5", features = ["util", "buffer"] } diff --git a/connectrpc/src/compression.rs b/connectrpc/src/compression.rs index 5b51519..e9c9dd6 100644 --- a/connectrpc/src/compression.rs +++ b/connectrpc/src/compression.rs @@ -127,11 +127,7 @@ pub trait CompressionProvider: Send + Sync + 'static { fn decompress_with_limit(&self, data: &[u8], max_size: usize) -> Result { use std::io::Read; let reader = self.decompressor(data)?; - let capacity = if max_size < 64 * 1024 * 1024 { - max_size.saturating_add(1) - } else { - 256 - }; + let capacity = initial_decompress_capacity(data.len(), 2, Some(max_size)); let mut buf = Vec::with_capacity(capacity); reader .take((max_size as u64).saturating_add(1)) @@ -752,13 +748,7 @@ impl GzipProvider { let deflate_start = gzip_header_len(data)?; let stream_data = &data[deflate_start..]; - let capacity = match max_size { - // For very large limits (e.g. usize::MAX), - // use a growth-based strategy instead of pre-allocating. - Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1), - _ => data.len().saturating_mul(2).max(256), - }; - let mut output = Vec::with_capacity(capacity); + let mut output = Vec::with_capacity(initial_decompress_capacity(data.len(), 2, max_size)); // Decompress the deflate stream, letting the decompressor find its // own end-of-stream marker rather than pre-slicing. @@ -773,7 +763,16 @@ impl GzipProvider { "decompressed size exceeds limit {limit}" ))); } - output.reserve(output.len().max(4096)); + // Grow on demand, but never reserve past `limit + 1`: once the + // buffer fills at that point the over-limit check above fires, + // so the peak allocation for an over-limit payload stays the + // same as it was with a limit-sized pre-allocation. + let mut additional = output.len().max(4096); + if let Some(limit) = max_size { + additional = + additional.min(limit.saturating_add(1).saturating_sub(output.capacity())); + } + output.reserve_exact(additional); } let status = decompressor .decompress_vec( @@ -1037,14 +1036,8 @@ impl ZstdProvider { let mut decoder = zstd::Decoder::new(data) .map_err(|e| ConnectError::internal(format!("zstd decompression failed: {e}")))?; - // Pre-size the output buffer using the same heuristic as GzipProvider: - // for reasonable limits, reserve limit+1; for huge/no limits, guess - // from input size. Avoids repeated reallocation in read_to_end. - let capacity = match max_size { - Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1), - _ => data.len().saturating_mul(4).max(256), - }; - let mut decompressed = Vec::with_capacity(capacity); + let mut decompressed = + Vec::with_capacity(initial_decompress_capacity(data.len(), 4, max_size)); match max_size { Some(limit) => { @@ -1124,6 +1117,32 @@ impl StreamingCompressionProvider for ZstdProvider { // Tests // ============================================================================ +/// Initial output-buffer capacity for buffered decompression. +/// +/// The output buffer becomes the backing allocation of the returned `Bytes`, +/// so it is sized from the compressed input rather than from the configured +/// limit — a limit-sized allocation would stay resident for the lifetime of +/// every (possibly tiny) message. The guess is `input_len × multiplier` +/// (gzip and the trait default use 2; zstd uses 4 because it typically +/// achieves higher ratios on RPC payloads), with a 256-byte floor, capped at +/// `limit + 1` so the initial allocation never exceeds what the limit allows. +/// +/// Callers grow the buffer on demand and enforce the limit as it grows; the +/// `read_to_end`-based callers may transiently reserve up to roughly twice +/// the bytes actually written (amortized growth), still bounded by their +/// `Read::take(limit + 1)` readers. +fn initial_decompress_capacity( + input_len: usize, + multiplier: usize, + max_size: Option, +) -> usize { + let mut capacity = input_len.saturating_mul(multiplier).max(256); + if let Some(limit) = max_size { + capacity = capacity.min(limit.saturating_add(1)); + } + capacity +} + #[cfg(test)] mod tests { use super::*; @@ -1390,6 +1409,95 @@ mod tests { assert_eq!(&decompressed[..], data); } + /// Limit used by the small-message allocation tests: the default + /// per-message limit configured by `Limits::default()`. + const ALLOCATION_TEST_LIMIT: usize = 4 * 1024 * 1024; + + /// Returns the capacity of the allocation backing `bytes`. + /// + /// `Bytes::try_into_mut` reuses the original allocation when the handle + /// is unique, so the resulting `BytesMut::capacity()` exposes how much + /// memory the decompressed message actually retains. + fn backing_capacity(bytes: Bytes) -> usize { + bytes + .try_into_mut() + .expect("freshly decompressed Bytes has no other references") + .capacity() + } + + /// Upper bound on the backing allocation accepted for a tiny decompressed + /// message. The sizing heuristic yields 256 bytes today; this leaves + /// headroom for modest changes while still failing if a limit-sized (or + /// even tens-of-KiB) buffer is retained per message. + const SMALL_MESSAGE_RETENTION_BOUND: usize = 4096; + + /// `backing_capacity` must actually observe over-allocation — otherwise + /// the small-message tests below could pass vacuously if `Bytes::from` + /// ever started shrinking the allocation itself. + #[test] + fn test_backing_capacity_observes_overallocation() { + let mut vec = Vec::with_capacity(1024 * 1024); + vec.extend_from_slice(b"tiny payload"); + let capacity = backing_capacity(Bytes::from(vec)); + assert!( + capacity >= 1024 * 1024, + "expected the over-allocated backing buffer to be visible, got {capacity}" + ); + } + + /// Decompressing a small gzip message must not retain a buffer sized by + /// the configured limit: the returned `Bytes` should be backed by an + /// allocation proportional to the actual message. + #[cfg(feature = "gzip")] + #[test] + fn test_gzip_decompress_small_message_allocation() { + let provider = GzipProvider::default(); + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < SMALL_MESSAGE_RETENTION_BOUND, + "small gzip message retained a {capacity}-byte backing buffer" + ); + } + + /// Same as the gzip allocation test, for the zstd provider. + #[cfg(feature = "zstd")] + #[test] + fn test_zstd_decompress_small_message_allocation() { + let provider = ZstdProvider::default(); + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < SMALL_MESSAGE_RETENTION_BOUND, + "small zstd message retained a {capacity}-byte backing buffer" + ); + } + + /// Same as the gzip allocation test, for the trait's default + /// `decompress_with_limit` implementation (used by custom providers). + #[test] + fn test_default_trait_decompress_small_message_allocation() { + let provider = MockProvider; + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < SMALL_MESSAGE_RETENTION_BOUND, + "small message retained a {capacity}-byte backing buffer via the default impl" + ); + } + /// Run `f` on a separate thread and require it to produce a result within /// `timeout`, failing the test immediately otherwise. ///