Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ categories = ["network-programming", "web-programming::http-server", "web-progra
http = "1"
http-body = "1"
http-body-util = "0.1"
bytes = { version = "1.5", features = ["serde"] }
bytes = { version = "1.6", features = ["serde"] }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming this floor bump (1.5 → 1.6, for Bytes::try_into_mut in the new tests) is intentional — it raises the minimum version of a publicly re-exported dependency for the sake of a test helper. Seems fine given how old 1.6 is; only flagging so it's a conscious choice.


# Tower service abstraction
tower = { version = "0.5", features = ["util", "buffer"] }
Expand Down
150 changes: 129 additions & 21 deletions connectrpc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,7 @@ pub trait CompressionProvider: Send + Sync + 'static {
fn decompress_with_limit(&self, data: &[u8], max_size: usize) -> Result<Bytes, ConnectError> {
use std::io::Read;
let reader = self.decompressor(data)?;
let capacity = if max_size < 64 * 1024 * 1024 {
max_size.saturating_add(1)
} else {
256
};
let capacity = initial_decompress_capacity(data.len(), 2, Some(max_size));
let mut buf = Vec::with_capacity(capacity);
reader
.take((max_size as u64).saturating_add(1))
Expand Down Expand Up @@ -752,13 +748,7 @@ impl GzipProvider {
let deflate_start = gzip_header_len(data)?;
let stream_data = &data[deflate_start..];

let capacity = match max_size {
// For very large limits (e.g. usize::MAX),
// use a growth-based strategy instead of pre-allocating.
Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1),
_ => data.len().saturating_mul(2).max(256),
};
let mut output = Vec::with_capacity(capacity);
let mut output = Vec::with_capacity(initial_decompress_capacity(data.len(), 2, max_size));

// Decompress the deflate stream, letting the decompressor find its
// own end-of-stream marker rather than pre-slicing.
Expand All @@ -773,7 +763,16 @@ impl GzipProvider {
"decompressed size exceeds limit {limit}"
)));
}
output.reserve(output.len().max(4096));
// Grow on demand, but never reserve past `limit + 1`: once the
// buffer fills at that point the over-limit check above fires,
// so the peak allocation for an over-limit payload stays the
// same as it was with a limit-sized pre-allocation.
let mut additional = output.len().max(4096);
if let Some(limit) = max_size {
additional =
additional.min(limit.saturating_add(1).saturating_sub(output.capacity()));
}
output.reserve_exact(additional);
}
let status = decompressor
.decompress_vec(
Expand Down Expand Up @@ -1037,14 +1036,8 @@ impl ZstdProvider {
let mut decoder = zstd::Decoder::new(data)
.map_err(|e| ConnectError::internal(format!("zstd decompression failed: {e}")))?;

// Pre-size the output buffer using the same heuristic as GzipProvider:
// for reasonable limits, reserve limit+1; for huge/no limits, guess
// from input size. Avoids repeated reallocation in read_to_end.
let capacity = match max_size {
Some(limit) if limit < 64 * 1024 * 1024 => limit.saturating_add(1),
_ => data.len().saturating_mul(4).max(256),
};
let mut decompressed = Vec::with_capacity(capacity);
let mut decompressed =
Vec::with_capacity(initial_decompress_capacity(data.len(), 4, max_size));

match max_size {
Some(limit) => {
Expand Down Expand Up @@ -1124,6 +1117,32 @@ impl StreamingCompressionProvider for ZstdProvider {
// Tests
// ============================================================================

/// Initial output-buffer capacity for buffered decompression.
///
/// The output buffer becomes the backing allocation of the returned `Bytes`,
/// so it is sized from the compressed input rather than from the configured
/// limit — a limit-sized allocation would stay resident for the lifetime of
/// every (possibly tiny) message. The guess is `input_len × multiplier`
/// (gzip and the trait default use 2; zstd uses 4 because it typically
/// achieves higher ratios on RPC payloads), with a 256-byte floor, capped at
/// `limit + 1` so the initial allocation never exceeds what the limit allows.
///
/// Callers grow the buffer on demand and enforce the limit as it grows; the
/// `read_to_end`-based callers may transiently reserve up to roughly twice
/// the bytes actually written (amortized growth), still bounded by their
/// `Read::take(limit + 1)` readers.
fn initial_decompress_capacity(
input_len: usize,
multiplier: usize,
max_size: Option<usize>,
) -> usize {
let mut capacity = input_len.saturating_mul(multiplier).max(256);
if let Some(limit) = max_size {
capacity = capacity.min(limit.saturating_add(1));
}
capacity
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1390,6 +1409,95 @@ mod tests {
assert_eq!(&decompressed[..], data);
}

/// Limit used by the small-message allocation tests: the default
/// per-message limit configured by `Limits::default()`.
const ALLOCATION_TEST_LIMIT: usize = 4 * 1024 * 1024;

/// Returns the capacity of the allocation backing `bytes`.
///
/// `Bytes::try_into_mut` reuses the original allocation when the handle
/// is unique, so the resulting `BytesMut::capacity()` exposes how much
/// memory the decompressed message actually retains.
fn backing_capacity(bytes: Bytes) -> usize {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper implicitly relies on Bytes::from(Vec) keeping the original capacity — the very behavior the production change is working around. If a future bytes release ever shrank in From<Vec>, these three tests would keep passing regardless of whether the sizing fix is present, i.e. they'd silently stop guarding the regression. A small control assertion (build a Bytes from a deliberately over-allocated Vec and check backing_capacity reports the large value) would keep them honest.

bytes
.try_into_mut()
.expect("freshly decompressed Bytes has no other references")
.capacity()
}

/// Upper bound on the backing allocation accepted for a tiny decompressed
/// message. The sizing heuristic yields 256 bytes today; this leaves
/// headroom for modest changes while still failing if a limit-sized (or
/// even tens-of-KiB) buffer is retained per message.
const SMALL_MESSAGE_RETENTION_BOUND: usize = 4096;

/// `backing_capacity` must actually observe over-allocation — otherwise
/// the small-message tests below could pass vacuously if `Bytes::from`
/// ever started shrinking the allocation itself.
#[test]
fn test_backing_capacity_observes_overallocation() {
let mut vec = Vec::with_capacity(1024 * 1024);
vec.extend_from_slice(b"tiny payload");
let capacity = backing_capacity(Bytes::from(vec));
assert!(
capacity >= 1024 * 1024,
"expected the over-allocated backing buffer to be visible, got {capacity}"
);
}

/// Decompressing a small gzip message must not retain a buffer sized by
/// the configured limit: the returned `Bytes` should be backed by an
/// allocation proportional to the actual message.
#[cfg(feature = "gzip")]
#[test]
fn test_gzip_decompress_small_message_allocation() {
let provider = GzipProvider::default();
let compressed = provider.compress(b"tiny payload").unwrap();
let out = provider
.decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT)
.unwrap();
assert_eq!(&out[..], b"tiny payload");
let capacity = backing_capacity(out);
assert!(
capacity < SMALL_MESSAGE_RETENTION_BOUND,
"small gzip message retained a {capacity}-byte backing buffer"
);
}

/// Same as the gzip allocation test, for the zstd provider.
#[cfg(feature = "zstd")]
#[test]
fn test_zstd_decompress_small_message_allocation() {
let provider = ZstdProvider::default();
let compressed = provider.compress(b"tiny payload").unwrap();
let out = provider
.decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT)
.unwrap();
assert_eq!(&out[..], b"tiny payload");
let capacity = backing_capacity(out);
assert!(
capacity < SMALL_MESSAGE_RETENTION_BOUND,
"small zstd message retained a {capacity}-byte backing buffer"
);
}

/// Same as the gzip allocation test, for the trait's default
/// `decompress_with_limit` implementation (used by custom providers).
#[test]
fn test_default_trait_decompress_small_message_allocation() {
let provider = MockProvider;
let compressed = provider.compress(b"tiny payload").unwrap();
let out = provider
.decompress_with_limit(&compressed, ALLOCATION_TEST_LIMIT)
.unwrap();
assert_eq!(&out[..], b"tiny payload");
let capacity = backing_capacity(out);
assert!(
capacity < SMALL_MESSAGE_RETENTION_BOUND,
"small message retained a {capacity}-byte backing buffer via the default impl"
);
}

/// Run `f` on a separate thread and require it to produce a result within
/// `timeout`, failing the test immediately otherwise.
///
Expand Down
Loading