diff --git a/docs/zero-copy.md b/docs/zero-copy.md index 048887a..d553107 100644 --- a/docs/zero-copy.md +++ b/docs/zero-copy.md @@ -87,7 +87,51 @@ MemorySegment decoded = dctx.decompress(arena, frame); // header-sized, exact le | decompress | `decompress(dst, src)` → bytes written | `decompress(arena, frame)` → output segment | The arena form of `decompress` requires the frame to store its decompressed size -(frames this library produces do). For size-less frames, size `dst` yourself. +(one-shot `compress` always stamps it; a *streamed* frame only does so when you +pledge the size up front — see [Pledged size](#pledged-size-unlocks-zero-copy-decode)). +For size-less frames, size `dst` yourself. + +## ByteBuffer interop + +Much of the Java ecosystem speaks `ByteBuffer`, not `MemorySegment` — NIO +channels, Netty, and `FileChannel.map`'s `MappedByteBuffer`. We deliberately do +**not** add a third set of `ByteBuffer` overloads: the segment API already +bridges both directions of the FFM↔NIO boundary at zero copy, because FFM defines +the conversions. + +- **`ByteBuffer` in** — wrap a *direct* buffer as a segment with + `MemorySegment.ofBuffer(buf)` (zero copy; a heap-backed buffer copies, the same + caveat as `byte[]`). Hand the segment to `compress` / `decompress`. +- **`MemorySegment` out to `ByteBuffer`** — `segment.asByteBuffer()` returns a + buffer view over the native bytes, no copy. The decompressed arena segment is + consumable by an existing `ByteBuffer` pipeline as-is. + +```java +// an mmap'd frame is already a direct ByteBuffer (FileChannel.map) +MemorySegment frame = MemorySegment.ofBuffer(mappedByteBuffer); +MemorySegment out = dctx.decompress(arena, frame); // zero-copy decode +ByteBuffer result = out.asByteBuffer(); // zero-copy hand-off +``` + +**Byte order.** `asByteBuffer()` on a *native* segment already returns a **direct** +buffer aliasing the same off-heap bytes — there is no copy and nothing to convert. +The one wart is byte order: it comes back `BIG_ENDIAN` regardless of platform, so a +caller doing multi-byte reads must restore the native order: + +```java +import java.nio.ByteOrder; + +ByteBuffer result = dctx.decompress(arena, frame) + .asByteBuffer() + .order(ByteOrder.nativeOrder()); // direct buffer, native order, zero copy +``` + +(For a pure byte payload the order does not matter and even that is unneeded.) The +remaining caveat is lifetime: the buffer borrows the arena's scope, so it must not +outlive the `try`-with-resources. A thin `toByteBuffer()` convenience on the +arena-returning results could fold the `order(nativeOrder())` call in one place, but +it would be a one-line output adapter, not new capability — the conversion already +exists. We keep the API segment-first (no parallel `ByteBuffer` surface to maintain). ## Zero-copy streaming @@ -117,3 +161,46 @@ try (ZstdCompressStream cs = new ZstdCompressStream(level)) { Both drivers take an optional `ZstdDictionary`. Decompression mirrors the loop, calling `decompress(dst, src)` until a result `isComplete()` (frame fully decoded). + +## Pledged size unlocks zero-copy decode + +Streaming compression has a hidden cost the one-shot path does not: **a streamed +frame does not record its decompressed size.** zstd writes the content-size field +in the frame header only when the encoder knows the total up front — trivially +true for `ZSTD_compress`, but a streaming encoder is fed incrementally and closes +the frame without ever being told the total. + +That field is exactly what the zero-copy decode path reads to size the output +arena. So a plain `ZstdOutputStream` frame **cannot be decoded zero-copy**: + +```java +byte[] frame = streamCompress(data); // no pledged size +Zstd.decompressedSize(segmentOf(frame)); // throws: "decompressed size not stored in frame" +dctx.decompress(arena, segmentOf(frame)); // same — it can't size the arena +``` + +The consumer is forced back onto the bounded streaming decoder (allocate, decode a +chunk, grow, repeat) or a guessed `maxSize` — the very heap-bounce the segment API +exists to avoid. + +`ZstdOutputStream.withPledgedSize(out, level, total)` closes the loop. Tell the +encoder the total before the first byte and it stamps the content size into the +header, so a downstream reader can size the output arena exactly and decode in one +shot: + +```java +try (var zout = ZstdOutputStream.withPledgedSize(sink, 6, data.length)) { + zout.write(data); // pledge must match the bytes written +} +byte[] frame = sink.toByteArray(); + +// downstream, in a memory-mapped reader: +MemorySegment src = MemorySegment.ofBuffer(mmap); +MemorySegment out = dctx.decompress(arena, src); // one allocation, zero copies +``` + +This is the case where pledging is not a micro-optimization but a correctness +gate: it is the difference between a frame that participates in the zero-copy +decode path and one that does not. Pledge whenever the producer streams but the +total is known (file length, serialized record count, `Content-Length`). The pledge +must equal the bytes actually written — a mismatch raises an error on close. diff --git a/zstd/src/test/java/io/github/dfa1/zstd/ZstdStreamTest.java b/zstd/src/test/java/io/github/dfa1/zstd/ZstdStreamTest.java index b4d218c..f77a437 100644 --- a/zstd/src/test/java/io/github/dfa1/zstd/ZstdStreamTest.java +++ b/zstd/src/test/java/io/github/dfa1/zstd/ZstdStreamTest.java @@ -9,6 +9,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Random; @@ -200,6 +203,53 @@ void recordsContentSizeInTheFrame() throws IOException { assertThat(ZstdFrame.header(frame).contentSize()).hasValue(original.length); assertThat(Zstd.decompress(frame)).isEqualTo(original); } + + @Test + void plainStreamFrameCannotBeSizedForZeroCopyDecode() throws IOException { + // Given a streamed frame with no pledged size + byte[] original = "no pledge ".repeat(500).getBytes(StandardCharsets.UTF_8); + byte[] frame = streamCompress(original, 6); + + // When the zero-copy decoder asks the frame how big the output is + try (Arena arena = Arena.ofConfined()) { + MemorySegment src = Zstd.copyIn(arena, frame); + ThrowingCallable result = () -> Zstd.decompressedSize(src); + + // Then it cannot answer — the content size was never recorded + assertThatThrownBy(result) + .isInstanceOf(ZstdException.class) + .hasMessageContaining("not stored"); + } + } + + @Test + void pledgedFrameDecodesZeroCopyIntoArenaInOneShot() throws IOException { + // Given a streamed frame that pledged its total up front + byte[] original = "pledge enables zero-copy ".repeat(500).getBytes(StandardCharsets.UTF_8); + ByteArrayOutputStream sink = new ByteArrayOutputStream(); + try (ZstdOutputStream zout = ZstdOutputStream.withPledgedSize(sink, 6, original.length)) { + zout.write(original); + } + byte[] frame = sink.toByteArray(); + + // a memory-mapped reader sees the frame as a direct ByteBuffer — no heap copy in + ByteBuffer mmap = ByteBuffer.allocateDirect(frame.length).put(frame).flip(); + + // When it decodes straight into its arena and hands the result back as a ByteBuffer + byte[] restored; + try (Arena arena = Arena.ofConfined(); + ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { + MemorySegment src = MemorySegment.ofBuffer(mmap); // zero-copy input view + MemorySegment out = dctx.decompress(arena, src); // one allocation, zero copies + ByteBuffer result = out.asByteBuffer(); // zero-copy hand-off out + + // Then the arena was sized exactly from the header and decode round-trips + assertThat(out.byteSize()).isEqualTo(original.length); + restored = new byte[result.remaining()]; + result.get(restored); + } + assertThat(restored).isEqualTo(original); + } } private static byte[] streamCompress(byte[] data, int level) throws IOException {