From cbd2f0e7150a04ad6b187c75cbd13fbcee177bda Mon Sep 17 00:00:00 2001 From: Iain McGinniss <309153+iainmcgin@users.noreply.github.com> Date: Wed, 27 May 2026 01:54:33 +0000 Subject: [PATCH 1/2] compression: size decompression buffers from the input, not the limit The buffered decompression paths (gzip, zstd, and the trait's default decompress_with_limit) pre-allocated max_message_size + 1 bytes for every message when the limit was below 64 MiB. Bytes::from(Vec) keeps the full allocation alive when the length is below the capacity, so every small decompressed message was backed by a limit-sized allocation for as long as the message was held. Size the initial buffer from the compressed input instead (capped at the limit), and let it grow on demand; the existing limit enforcement during growth and the Read::take bounds are unchanged. Adds tests asserting that a small decompressed message is not backed by a limit-sized allocation, for all three paths. --- Cargo.toml | 2 +- connectrpc/src/compression.rs | 115 ++++++++++++++++++++++++++++------ 2 files changed, 98 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 775c20f..f1c3033 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["network-programming", "web-programming::http-server", "web-progra http = "1" http-body = "1" http-body-util = "0.1" -bytes = { version = "1.5", features = ["serde"] } +bytes = { version = "1.6", features = ["serde"] } # Tower service abstraction tower = { version = "0.5", features = ["util", "buffer"] } diff --git a/connectrpc/src/compression.rs b/connectrpc/src/compression.rs index 519622f..2d7afc0 100644 --- a/connectrpc/src/compression.rs +++ b/connectrpc/src/compression.rs @@ -127,11 +127,16 @@ pub trait CompressionProvider: Send + Sync + 'static { fn decompress_with_limit(&self, data: &[u8], max_size: usize) -> Result { use std::io::Read; let reader = self.decompressor(data)?; - let capacity = if max_size < 64 * 1024 * 1024 { - max_size.saturating_add(1) - } else { - 256 - }; + // Size the initial buffer from the compressed input rather than the + // limit: this buffer becomes the backing allocation of the returned + // `Bytes`, so a limit-sized allocation would stay resident for the + // lifetime of every (possibly tiny) message. `read_to_end` grows the + // buffer on demand and the `take` below still bounds the total. + let capacity = data + .len() + .saturating_mul(2) + .max(256) + .min(max_size.saturating_add(1)); let mut buf = Vec::with_capacity(capacity); reader .take((max_size as u64).saturating_add(1)) @@ -752,12 +757,15 @@ impl GzipProvider { let deflate_start = gzip_header_len(data)?; let stream_data = &data[deflate_start..]; - let capacity = match max_size { - // For very large limits (e.g. usize::MAX), - // use a growth-based strategy instead of pre-allocating. - Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1), - _ => data.len().saturating_mul(2).max(256), - }; + // Size the initial buffer from the compressed input rather than the + // limit: this buffer becomes the backing allocation of the returned + // `Bytes`, so a limit-sized allocation would stay resident for the + // lifetime of every (possibly tiny) message. The loop below grows + // the buffer on demand and enforces the limit as it grows. + let mut capacity = data.len().saturating_mul(2).max(256); + if let Some(limit) = max_size { + capacity = capacity.min(limit.saturating_add(1)); + } let mut output = Vec::with_capacity(capacity); // Decompress the deflate stream, letting the decompressor find its @@ -1026,13 +1034,15 @@ impl ZstdProvider { let mut decoder = zstd::Decoder::new(data) .map_err(|e| ConnectError::internal(format!("zstd decompression failed: {e}")))?; - // Pre-size the output buffer using the same heuristic as GzipProvider: - // for reasonable limits, reserve limit+1; for huge/no limits, guess - // from input size. Avoids repeated reallocation in read_to_end. - let capacity = match max_size { - Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1), - _ => data.len().saturating_mul(4).max(256), - }; + // Size the initial buffer from the compressed input rather than the + // limit: this buffer becomes the backing allocation of the returned + // `Bytes`, so a limit-sized allocation would stay resident for the + // lifetime of every (possibly tiny) message. `read_to_end` grows the + // buffer on demand and the `take` below still bounds the total. + let mut capacity = data.len().saturating_mul(4).max(256); + if let Some(limit) = max_size { + capacity = capacity.min(limit.saturating_add(1)); + } let mut decompressed = Vec::with_capacity(capacity); match max_size { @@ -1379,6 +1389,75 @@ mod tests { assert_eq!(&decompressed[..], data); } + /// Limit used by the small-message allocation tests: the default + /// per-message limit configured by `Limits::default()`. + const ALLOCATION_TEST_LIMIT: usize = 4 * 1024 * 1024; + + /// Returns the capacity of the allocation backing `bytes`. + /// + /// `Bytes::try_into_mut` reuses the original allocation when the handle + /// is unique, so the resulting `BytesMut::capacity()` exposes how much + /// memory the decompressed message actually retains. + fn backing_capacity(bytes: Bytes) -> usize { + bytes + .try_into_mut() + .expect("freshly decompressed Bytes has no other references") + .capacity() + } + + /// Decompressing a small gzip message must not retain a buffer sized by + /// the configured limit: the returned `Bytes` should be backed by an + /// allocation proportional to the actual message. + #[cfg(feature = "gzip")] + #[test] + fn test_gzip_decompress_small_message_allocation() { + let provider = GzipProvider::default(); + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < 64 * 1024, + "small gzip message retained a {capacity}-byte backing buffer" + ); + } + + /// Same as the gzip allocation test, for the zstd provider. + #[cfg(feature = "zstd")] + #[test] + fn test_zstd_decompress_small_message_allocation() { + let provider = ZstdProvider::default(); + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < 64 * 1024, + "small zstd message retained a {capacity}-byte backing buffer" + ); + } + + /// Same as the gzip allocation test, for the trait's default + /// `decompress_with_limit` implementation (used by custom providers). + #[test] + fn test_default_trait_decompress_small_message_allocation() { + let provider = MockProvider; + let compressed = provider.compress(b"tiny payload").unwrap(); + let out = provider + .decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT) + .unwrap(); + assert_eq!(&out[..], b"tiny payload"); + let capacity = backing_capacity(out); + assert!( + capacity < 64 * 1024, + "small message retained a {capacity}-byte backing buffer via the default impl" + ); + } + #[cfg(feature = "gzip")] #[test] fn test_gzip_registry() { From 6d4b1ead0a99f2602bef643b2d53dc078bfba7d1 Mon Sep 17 00:00:00 2001 From: Iain McGinniss <309153+iainmcgin@users.noreply.github.com> Date: Wed, 27 May 2026 03:00:51 +0000 Subject: [PATCH 2/2] compression: address review feedback on decompression buffer sizing - share the initial-capacity heuristic in initial_decompress_capacity and document the gzip/zstd multiplier choice there - cap gzip buffer growth at limit + 1 so the peak allocation for an over-limit payload matches the previous pre-allocation behaviour - name the small-message retention bound used by the tests and add a control test that the capacity probe observes over-allocation --- connectrpc/src/compression.rs | 97 +++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 34 deletions(-) diff --git a/connectrpc/src/compression.rs b/connectrpc/src/compression.rs index 2d7afc0..c26be49 100644 --- a/connectrpc/src/compression.rs +++ b/connectrpc/src/compression.rs @@ -127,16 +127,7 @@ pub trait CompressionProvider: Send + Sync + 'static { fn decompress_with_limit(&self, data: &[u8], max_size: usize) -> Result { use std::io::Read; let reader = self.decompressor(data)?; - // Size the initial buffer from the compressed input rather than the - // limit: this buffer becomes the backing allocation of the returned - // `Bytes`, so a limit-sized allocation would stay resident for the - // lifetime of every (possibly tiny) message. `read_to_end` grows the - // buffer on demand and the `take` below still bounds the total. - let capacity = data - .len() - .saturating_mul(2) - .max(256) - .min(max_size.saturating_add(1)); + let capacity = initial_decompress_capacity(data.len(), 2, Some(max_size)); let mut buf = Vec::with_capacity(capacity); reader .take((max_size as u64).saturating_add(1)) @@ -757,16 +748,7 @@ impl GzipProvider { let deflate_start = gzip_header_len(data)?; let stream_data = &data[deflate_start..]; - // Size the initial buffer from the compressed input rather than the - // limit: this buffer becomes the backing allocation of the returned - // `Bytes`, so a limit-sized allocation would stay resident for the - // lifetime of every (possibly tiny) message. The loop below grows - // the buffer on demand and enforces the limit as it grows. - let mut capacity = data.len().saturating_mul(2).max(256); - if let Some(limit) = max_size { - capacity = capacity.min(limit.saturating_add(1)); - } - let mut output = Vec::with_capacity(capacity); + let mut output = Vec::with_capacity(initial_decompress_capacity(data.len(), 2, max_size)); // Decompress the deflate stream, letting the decompressor find its // own end-of-stream marker rather than pre-slicing. @@ -781,7 +763,16 @@ impl GzipProvider { "decompressed size exceeds limit {limit}" ))); } - output.reserve(output.len().max(4096)); + // Grow on demand, but never reserve past `limit + 1`: once the + // buffer fills at that point the over-limit check above fires, + // so the peak allocation for an over-limit payload stays the + // same as it was with a limit-sized pre-allocation. + let mut additional = output.len().max(4096); + if let Some(limit) = max_size { + additional = + additional.min(limit.saturating_add(1).saturating_sub(output.capacity())); + } + output.reserve_exact(additional); } let status = decompressor .decompress_vec( @@ -1034,16 +1025,8 @@ impl ZstdProvider { let mut decoder = zstd::Decoder::new(data) .map_err(|e| ConnectError::internal(format!("zstd decompression failed: {e}")))?; - // Size the initial buffer from the compressed input rather than the - // limit: this buffer becomes the backing allocation of the returned - // `Bytes`, so a limit-sized allocation would stay resident for the - // lifetime of every (possibly tiny) message. `read_to_end` grows the - // buffer on demand and the `take` below still bounds the total. - let mut capacity = data.len().saturating_mul(4).max(256); - if let Some(limit) = max_size { - capacity = capacity.min(limit.saturating_add(1)); - } - let mut decompressed = Vec::with_capacity(capacity); + let mut decompressed = + Vec::with_capacity(initial_decompress_capacity(data.len(), 4, max_size)); match max_size { Some(limit) => { @@ -1123,6 +1106,32 @@ impl StreamingCompressionProvider for ZstdProvider { // Tests // ============================================================================ +/// Initial output-buffer capacity for buffered decompression. +/// +/// The output buffer becomes the backing allocation of the returned `Bytes`, +/// so it is sized from the compressed input rather than from the configured +/// limit — a limit-sized allocation would stay resident for the lifetime of +/// every (possibly tiny) message. The guess is `input_len × multiplier` +/// (gzip and the trait default use 2; zstd uses 4 because it typically +/// achieves higher ratios on RPC payloads), with a 256-byte floor, capped at +/// `limit + 1` so the initial allocation never exceeds what the limit allows. +/// +/// Callers grow the buffer on demand and enforce the limit as it grows; the +/// `read_to_end`-based callers may transiently reserve up to roughly twice +/// the bytes actually written (amortized growth), still bounded by their +/// `Read::take(limit + 1)` readers. +fn initial_decompress_capacity( + input_len: usize, + multiplier: usize, + max_size: Option, +) -> usize { + let mut capacity = input_len.saturating_mul(multiplier).max(256); + if let Some(limit) = max_size { + capacity = capacity.min(limit.saturating_add(1)); + } + capacity +} + #[cfg(test)] mod tests { use super::*; @@ -1405,6 +1414,26 @@ mod tests { .capacity() } + /// Upper bound on the backing allocation accepted for a tiny decompressed + /// message. The sizing heuristic yields 256 bytes today; this leaves + /// headroom for modest changes while still failing if a limit-sized (or + /// even tens-of-KiB) buffer is retained per message. + const SMALL_MESSAGE_RETENTION_BOUND: usize = 4096; + + /// `backing_capacity` must actually observe over-allocation — otherwise + /// the small-message tests below could pass vacuously if `Bytes::from` + /// ever started shrinking the allocation itself. + #[test] + fn test_backing_capacity_observes_overallocation() { + let mut vec = Vec::with_capacity(1024 * 1024); + vec.extend_from_slice(b"tiny payload"); + let capacity = backing_capacity(Bytes::from(vec)); + assert!( + capacity >= 1024 * 1024, + "expected the over-allocated backing buffer to be visible, got {capacity}" + ); + } + /// Decompressing a small gzip message must not retain a buffer sized by /// the configured limit: the returned `Bytes` should be backed by an /// allocation proportional to the actual message. @@ -1419,7 +1448,7 @@ mod tests { assert_eq!(&out[..], b"tiny payload"); let capacity = backing_capacity(out); assert!( - capacity < 64 * 1024, + capacity < SMALL_MESSAGE_RETENTION_BOUND, "small gzip message retained a {capacity}-byte backing buffer" ); } @@ -1436,7 +1465,7 @@ mod tests { assert_eq!(&out[..], b"tiny payload"); let capacity = backing_capacity(out); assert!( - capacity < 64 * 1024, + capacity < SMALL_MESSAGE_RETENTION_BOUND, "small zstd message retained a {capacity}-byte backing buffer" ); } @@ -1453,7 +1482,7 @@ mod tests { assert_eq!(&out[..], b"tiny payload"); let capacity = backing_capacity(out); assert!( - capacity < 64 * 1024, + capacity < SMALL_MESSAGE_RETENTION_BOUND, "small message retained a {capacity}-byte backing buffer via the default impl" ); }