Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion docs/zero-copy.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,51 @@ MemorySegment decoded = dctx.decompress(arena, frame); // header-sized, exact le
| decompress | `decompress(dst, src)` → bytes written | `decompress(arena, frame)` → output segment |

The arena form of `decompress` requires the frame to store its decompressed size
(frames this library produces do). For size-less frames, size `dst` yourself.
(one-shot `compress` always stamps it; a *streamed* frame only does so when you
pledge the size up front — see [Pledged size](#pledged-size-unlocks-zero-copy-decode)).
For size-less frames, size `dst` yourself.

## ByteBuffer interop

Much of the Java ecosystem speaks `ByteBuffer`, not `MemorySegment` — NIO
channels, Netty, and `FileChannel.map`'s `MappedByteBuffer`. We deliberately do
**not** add a third set of `ByteBuffer` overloads: the segment API already
bridges both directions of the FFM↔NIO boundary at zero copy, because FFM defines
the conversions.

- **`ByteBuffer` in** — wrap a *direct* buffer as a segment with
`MemorySegment.ofBuffer(buf)` (zero copy; a heap-backed buffer copies, the same
caveat as `byte[]`). Hand the segment to `compress` / `decompress`.
- **`MemorySegment` out to `ByteBuffer`** — `segment.asByteBuffer()` returns a
buffer view over the native bytes, no copy. The decompressed arena segment is
consumable by an existing `ByteBuffer` pipeline as-is.

```java
// an mmap'd frame is already a direct ByteBuffer (FileChannel.map)
MemorySegment frame = MemorySegment.ofBuffer(mappedByteBuffer);
MemorySegment out = dctx.decompress(arena, frame); // zero-copy decode
ByteBuffer result = out.asByteBuffer(); // zero-copy hand-off
```

**Byte order.** `asByteBuffer()` on a *native* segment already returns a **direct**
buffer aliasing the same off-heap bytes — there is no copy and nothing to convert.
The one wart is byte order: it comes back `BIG_ENDIAN` regardless of platform, so a
caller doing multi-byte reads must restore the native order:

```java
import java.nio.ByteOrder;

ByteBuffer result = dctx.decompress(arena, frame)
.asByteBuffer()
.order(ByteOrder.nativeOrder()); // direct buffer, native order, zero copy
```

(For a pure byte payload the order does not matter and even that is unneeded.) The
remaining caveat is lifetime: the buffer borrows the arena's scope, so it must not
outlive the `try`-with-resources. A thin `toByteBuffer()` convenience on the
arena-returning results could fold the `order(nativeOrder())` call in one place, but
it would be a one-line output adapter, not new capability — the conversion already
exists. We keep the API segment-first (no parallel `ByteBuffer` surface to maintain).

## Zero-copy streaming

Expand Down Expand Up @@ -117,3 +161,46 @@ try (ZstdCompressStream cs = new ZstdCompressStream(level)) {

Both drivers take an optional `ZstdDictionary`. Decompression mirrors the loop,
calling `decompress(dst, src)` until a result `isComplete()` (frame fully decoded).

## Pledged size unlocks zero-copy decode

Streaming compression has a hidden cost the one-shot path does not: **a streamed
frame does not record its decompressed size.** zstd writes the content-size field
in the frame header only when the encoder knows the total up front — trivially
true for `ZSTD_compress`, but a streaming encoder is fed incrementally and closes
the frame without ever being told the total.

That field is exactly what the zero-copy decode path reads to size the output
arena. So a plain `ZstdOutputStream` frame **cannot be decoded zero-copy**:

```java
byte[] frame = streamCompress(data); // no pledged size
Zstd.decompressedSize(segmentOf(frame)); // throws: "decompressed size not stored in frame"
dctx.decompress(arena, segmentOf(frame)); // same — it can't size the arena
```

The consumer is forced back onto the bounded streaming decoder (allocate, decode a
chunk, grow, repeat) or a guessed `maxSize` — the very heap-bounce the segment API
exists to avoid.

`ZstdOutputStream.withPledgedSize(out, level, total)` closes the loop. Tell the
encoder the total before the first byte and it stamps the content size into the
header, so a downstream reader can size the output arena exactly and decode in one
shot:

```java
try (var zout = ZstdOutputStream.withPledgedSize(sink, 6, data.length)) {
zout.write(data); // pledge must match the bytes written
}
byte[] frame = sink.toByteArray();

// downstream, in a memory-mapped reader:
MemorySegment src = MemorySegment.ofBuffer(mmap);
MemorySegment out = dctx.decompress(arena, src); // one allocation, zero copies
```

This is the case where pledging is not a micro-optimization but a correctness
gate: it is the difference between a frame that participates in the zero-copy
decode path and one that does not. Pledge whenever the producer streams but the
total is known (file length, serialized record count, `Content-Length`). The pledge
must equal the bytes actually written — a mismatch raises an error on close.
50 changes: 50 additions & 0 deletions zstd/src/test/java/io/github/dfa1/zstd/ZstdStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Random;

Expand Down Expand Up @@ -200,6 +203,53 @@ void recordsContentSizeInTheFrame() throws IOException {
assertThat(ZstdFrame.header(frame).contentSize()).hasValue(original.length);
assertThat(Zstd.decompress(frame)).isEqualTo(original);
}

@Test
void plainStreamFrameCannotBeSizedForZeroCopyDecode() throws IOException {
// Given a streamed frame with no pledged size
byte[] original = "no pledge ".repeat(500).getBytes(StandardCharsets.UTF_8);
byte[] frame = streamCompress(original, 6);

// When the zero-copy decoder asks the frame how big the output is
try (Arena arena = Arena.ofConfined()) {
MemorySegment src = Zstd.copyIn(arena, frame);
ThrowingCallable result = () -> Zstd.decompressedSize(src);

// Then it cannot answer — the content size was never recorded
assertThatThrownBy(result)
.isInstanceOf(ZstdException.class)
.hasMessageContaining("not stored");
}
}

@Test
void pledgedFrameDecodesZeroCopyIntoArenaInOneShot() throws IOException {
// Given a streamed frame that pledged its total up front
byte[] original = "pledge enables zero-copy ".repeat(500).getBytes(StandardCharsets.UTF_8);
ByteArrayOutputStream sink = new ByteArrayOutputStream();
try (ZstdOutputStream zout = ZstdOutputStream.withPledgedSize(sink, 6, original.length)) {
zout.write(original);
}
byte[] frame = sink.toByteArray();

// a memory-mapped reader sees the frame as a direct ByteBuffer — no heap copy in
ByteBuffer mmap = ByteBuffer.allocateDirect(frame.length).put(frame).flip();

// When it decodes straight into its arena and hands the result back as a ByteBuffer
byte[] restored;
try (Arena arena = Arena.ofConfined();
ZstdDecompressCtx dctx = new ZstdDecompressCtx()) {
MemorySegment src = MemorySegment.ofBuffer(mmap); // zero-copy input view
MemorySegment out = dctx.decompress(arena, src); // one allocation, zero copies
ByteBuffer result = out.asByteBuffer(); // zero-copy hand-off out

// Then the arena was sized exactly from the header and decode round-trips
assertThat(out.byteSize()).isEqualTo(original.length);
restored = new byte[result.remaining()];
result.get(restored);
}
assertThat(restored).isEqualTo(original);
}
}

private static byte[] streamCompress(byte[] data, int level) throws IOException {
Expand Down
Loading