From 68f7ce75266954c0c45f2e2d6ede52fe69e09736 Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Fri, 26 Jun 2026 15:38:58 +0200 Subject: [PATCH 1/3] feat(zstd): decode dictionary-compressed segments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The decoder rejected `vortex.zstd` segments carrying a shared dictionary (`dictionary_size != 0`). With the native libzstd bindings this is now decodable: digest the dictionary once per segment into a `ZstdDecompressDict` and decompress each frame against it. Buffer layout follows the Rust reference — with a dictionary, buffer[0] is the dictionary and frames follow at buffer[1..]; without one, frames start at buffer[0]. Frame decompression stays zero-copy (segment to arena slice); only the small dictionary takes one heap copy, off the hot path. Flips the unit test from asserting rejection to a dictionary round-trip, and adds zstd.vortex to the decoded published-fixtures list. Co-Authored-By: Claude Opus 4.8 --- .../reader/decode/ZstdEncodingDecoder.java | 54 ++++++++++++++----- .../vortex/reader/VortexHttpReaderIT.java | 30 ++--------- .../encode/ZstdEncodingEncoderTest.java | 40 +++++++++----- 3 files changed, 71 insertions(+), 53 deletions(-) diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java index 5effe141..a108924d 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java @@ -19,10 +19,13 @@ import io.github.dfa1.vortex.reader.array.VarBinArray; import io.github.dfa1.zstd.ZstdDecompressCtx; +import io.github.dfa1.zstd.ZstdDecompressDict; +import io.github.dfa1.zstd.ZstdDictionary; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; /// Read-only decoder for `vortex.zstd`. public final class ZstdEncodingDecoder implements EncodingDecoder { @@ -49,11 +52,6 @@ public Array decode(DecodeContext ctx) { } catch (IOException e) { throw new VortexException(EncodingId.VORTEX_ZSTD, "invalid metadata", e); } - if (meta.dictionary_size() != 0) { - throw new VortexException(EncodingId.VORTEX_ZSTD, - "dictionary-compressed Zstd segments are not supported"); - } - BoolArray validity = null; if (ctx.node().children().length > 0) { Array validityArray = ctx.decodeChild(0, DType.BOOL, ctx.rowCount()); @@ -151,24 +149,52 @@ private static MemorySegment decompressFrames( // Zero-copy: decompress each native frame straight into its slice of the arena output, // no heap byte[] bounce. The mmap'd file buffers are already native; the scratch arena // only services the heap segments unit tests hand in. + // + // Buffer layout mirrors the Rust reference: with a shared dictionary, buffer[0] is the + // dictionary and the frames follow at buffer[1..]; without one, the frames start at + // buffer[0]. The frames count is metadata-driven either way. + boolean hasDictionary = meta.dictionary_size() != 0; + int frameBufferBase = hasDictionary ? 1 : 0; MemorySegment out = ctx.arena().allocate(totalUncompressed); try (ZstdDecompressCtx dctx = new ZstdDecompressCtx(); Arena scratch = Arena.ofConfined()) { - long outOffset = 0; - for (int i = 0; i < frameCount; i++) { - MemorySegment src = asNative(ctx.buffer(i), scratch); - int uncompSize = (int) meta.frames().get(i).uncompressed_size(); - long written = dctx.decompress(out.asSlice(outOffset, uncompSize), src); - if (written != uncompSize) { - throw new VortexException(EncodingId.VORTEX_ZSTD, - "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + ZstdDecompressDict dictionary = hasDictionary ? digestDictionary(ctx.buffer(0)) : null; + try { + long outOffset = 0; + for (int i = 0; i < frameCount; i++) { + MemorySegment src = asNative(ctx.buffer(frameBufferBase + i), scratch); + int uncompSize = (int) meta.frames().get(i).uncompressed_size(); + MemorySegment dst = out.asSlice(outOffset, uncompSize); + long written = dictionary == null + ? dctx.decompress(dst, src) + : dctx.decompress(dst, src, dictionary); + if (written != uncompSize) { + throw new VortexException(EncodingId.VORTEX_ZSTD, + "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + } + outOffset += uncompSize; + } + } finally { + if (dictionary != null) { + dictionary.close(); } - outOffset += uncompSize; } } return out; } + /// Digests the raw dictionary bytes carried in `dictBuffer` into a reusable native + /// decompression dictionary shared by every frame in this segment. + /// + /// The one heap copy here is off the hot path: the dictionary is digested once per segment + /// (not per frame or per row) over a small buffer, and `ZSTD_createDDict` re-copies into its + /// own native allocation regardless. Switch to a `MemorySegment` overload once the zstd + /// bindings expose one. + private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer) { + byte[] raw = dictBuffer.toArray(ValueLayout.JAVA_BYTE); + return new ZstdDecompressDict(ZstdDictionary.of(raw)); + } + /// Returns `seg` unchanged when it is already native (the production mmap path); otherwise /// copies it into `scratch` so the zero-copy native API can read it. private static MemorySegment asNative(MemorySegment seg, Arena scratch) { diff --git a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java index 81b28e23..9736f691 100644 --- a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java @@ -1,7 +1,6 @@ package io.github.dfa1.vortex.reader; import io.github.dfa1.vortex.core.model.DType; -import io.github.dfa1.vortex.core.error.VortexException; import io.github.dfa1.vortex.core.io.VortexFormat; import io.github.dfa1.vortex.reader.array.ListArray; import io.github.dfa1.vortex.reader.array.ListViewArray; @@ -15,7 +14,6 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /// Integration test: reads real Vortex files from the public S3 compatibility bucket /// via HTTP Range requests and validates structure + data. @@ -91,9 +89,9 @@ void scan_forVortex_decodesAllRows() throws Exception { "decimal.vortex", "decimal_byte_parts.vortex", "dict.vortex", "fixed_size_list.vortex", "fsst.vortex", "null.vortex", "pco.vortex", "primitives.vortex", "rle.vortex", "runend.vortex", "sequence.vortex", "sparse.vortex", "struct_nested.vortex", - "varbin.vortex", "varbinview.vortex", "zigzag.vortex" - // zstd.vortex omitted: dictionary-compressed, unsupported by the pure-Java decoder - // (see scan_zstdVortex_rejectsDictionaryCompression). + "varbin.vortex", "varbinview.vortex", "zigzag.vortex", "zstd.vortex" + // zstd.vortex is dictionary-compressed; decoded against its shared dictionary via the + // native libzstd bindings. }) void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { // Given @@ -114,28 +112,6 @@ void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { assertThat(totalRows).isGreaterThan(0); } - // The published zstd.vortex fixture is dictionary-compressed; the decoder has no Zstd - // dictionary support and must fail fast with a clear message rather than mis-decode. - // Tracked by https://github.com/dfa1/vortex-java/issues/104. - @Test - void scan_zstdVortex_rejectsDictionaryCompression() throws Exception { - // Given - URI uri = BASE.resolve("zstd.vortex"); - - // When / Then - try (var sut = VortexHttpReader.open(uri); - var iter = sut.scan(ScanOptions.all())) { - assertThatThrownBy(() -> { - while (iter.hasNext()) { - try (var c = iter.next()) { - c.rowCount(); - } - } - }) - .isInstanceOf(VortexException.class) - .hasMessageContaining("dictionary-compressed Zstd segments are not supported"); - } - } // vortex.masked / vortex.patched / vortex.variant: decoders implemented, but no S3 fixture // is published (still absent at v0.75.0) — enable this test once fixtures exist upstream. diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java index e952c2d8..f8115dc3 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java @@ -1,6 +1,8 @@ package io.github.dfa1.vortex.writer.encode; import io.github.dfa1.zstd.Zstd; +import io.github.dfa1.zstd.ZstdCompressCtx; +import io.github.dfa1.zstd.ZstdDictionary; import io.github.dfa1.vortex.core.model.DType; import io.github.dfa1.vortex.core.model.PType; import io.github.dfa1.vortex.core.error.VortexException; @@ -147,6 +149,12 @@ private static byte[] compress(byte[] input) { return Zstd.compress(input); } + private static byte[] compressWithDict(byte[] input, byte[] dict) { + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + return cctx.compress(input, ZstdDictionary.of(dict)); + } + } + private static byte[] metaNoDict(long[] uncompressedSizes, long[] nValues) { java.util.List frames = new java.util.ArrayList<>(); for (int i = 0; i < uncompressedSizes.length; i++) { @@ -178,18 +186,26 @@ private static byte[] toLengthPrefixed(String[] strings) { } @Test - void decode_withDictionary_throws() { - // Given — metadata with non-zero dictionary_size; pure-Java decoder doesn't support - // dictionary-compressed Zstd (no JNI dependency) - byte[] compressed = compress(toLeBytes(new int[]{1, 2, 3})); - byte[] meta = new ProtoZstdMetadata(256, - java.util.List.of(new ProtoZstdFrameMetadata(12, 3))).encode(); - DecodeContext ctx = makeDictCtx(meta, DTypes.I32, 3, new byte[256], compressed); - - // When / Then - assertThatThrownBy(() -> DECODER.decode(ctx)) - .isInstanceOf(VortexException.class) - .hasMessageContaining("dictionary"); + void decode_withDictionary_roundTrips() { + // Given — a frame compressed against a shared dictionary; metadata carries the + // dictionary size and the dict bytes live in buffer[0], frames in buffer[1..] + // (mirrors the Rust reference layout). + byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8); + int[] values = {1, 2, 3}; + byte[] raw = toLeBytes(values); + byte[] compressed = compressWithDict(raw, dict); + byte[] meta = new ProtoZstdMetadata(dict.length, + java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode(); + DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed); + + // When + IntArray result = (IntArray) DECODER.decode(ctx); + + // Then + assertThat(result.length()).isEqualTo(values.length); + for (int i = 0; i < values.length; i++) { + assertThat(result.getInt(i)).as("index %d", i).isEqualTo(values[i]); + } } @Test From e20dc10ff6dfa4f77705ba9117c2ee4272e68d07 Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Fri, 26 Jun 2026 15:44:35 +0200 Subject: [PATCH 2/3] fix(zstd): validate dictionary_size against buffer on decode The dictionary-compressed decode path used dictionary_size only as a presence flag and never checked it against the actual dictionary buffer size. Digesting a truncated or oversized dictionary would silently produce wrong output. Fail fast on mismatch, mirroring the Rust reference invariant (encodings/zstd/src/array.rs). Adds a unit test and drops a stray blank line in the IT. Co-Authored-By: Claude Opus 4.8 --- .../reader/decode/ZstdEncodingDecoder.java | 13 +++++++++++-- .../vortex/reader/VortexHttpReaderIT.java | 1 - .../encode/ZstdEncodingEncoderTest.java | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java index a108924d..f60e7d49 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java @@ -158,7 +158,7 @@ private static MemorySegment decompressFrames( MemorySegment out = ctx.arena().allocate(totalUncompressed); try (ZstdDecompressCtx dctx = new ZstdDecompressCtx(); Arena scratch = Arena.ofConfined()) { - ZstdDecompressDict dictionary = hasDictionary ? digestDictionary(ctx.buffer(0)) : null; + ZstdDecompressDict dictionary = hasDictionary ? digestDictionary(ctx.buffer(0), meta.dictionary_size()) : null; try { long outOffset = 0; for (int i = 0; i < frameCount; i++) { @@ -190,7 +190,16 @@ private static MemorySegment decompressFrames( /// (not per frame or per row) over a small buffer, and `ZSTD_createDDict` re-copies into its /// own native allocation regardless. Switch to a `MemorySegment` overload once the zstd /// bindings expose one. - private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer) { + /// + /// `declaredSize` is the metadata's `dictionary_size`; it must match the dictionary buffer's + /// byte size (the Rust reference enforces the same invariant), otherwise the segment is + /// malformed and we fail fast rather than digest a truncated dictionary. + private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer, long declaredSize) { + if (dictBuffer.byteSize() != declaredSize) { + throw new VortexException(EncodingId.VORTEX_ZSTD, + "dictionary size metadata " + declaredSize + + " does not match buffer size " + dictBuffer.byteSize()); + } byte[] raw = dictBuffer.toArray(ValueLayout.JAVA_BYTE); return new ZstdDecompressDict(ZstdDictionary.of(raw)); } diff --git a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java index 9736f691..c2a10019 100644 --- a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java @@ -112,7 +112,6 @@ void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { assertThat(totalRows).isGreaterThan(0); } - // vortex.masked / vortex.patched / vortex.variant: decoders implemented, but no S3 fixture // is published (still absent at v0.75.0) — enable this test once fixtures exist upstream. diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java index f8115dc3..591fb8ef 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java @@ -208,6 +208,25 @@ void decode_withDictionary_roundTrips() { } } + @Test + void decode_withDictionarySizeMismatch_throws() { + // Given — metadata declares a dictionary_size that does not match the dict buffer's + // actual byte size; the decoder must fail fast rather than digest a malformed + // dictionary (the Rust reference enforces the same invariant). + byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8); + int[] values = {1, 2, 3}; + byte[] raw = toLeBytes(values); + byte[] compressed = compressWithDict(raw, dict); + byte[] meta = new ProtoZstdMetadata(dict.length + 1, + java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode(); + DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed); + + // When / Then + assertThatThrownBy(() -> DECODER.decode(ctx)) + .isInstanceOf(VortexException.class) + .hasMessageContaining("dictionary size metadata"); + } + @Test void decode_nullable_primitive_scattersValuesCorrectly() { boolean[] validityBits = {true, false, true, false}; From a2c50ad4e7cad8b39661f6724a70953a17b611fa Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Fri, 26 Jun 2026 16:28:25 +0200 Subject: [PATCH 3/3] perf(zstd): zero-copy dictionary digest via MemorySegment (zstd 0.2) zstd bindings 0.2 expose ZstdDecompressDict(MemorySegment), which hands the dictionary buffer straight to ZSTD_createDDict with no intermediate heap byte[]. Drop the toArray() bounce in digestDictionary; native dictionary buffers (the mmap'd production path) now flow through unchanged, and the unit-test heap buffers go through the existing asNative() copy. Keeps the dictionary_size validation. Co-Authored-By: Claude Opus 4.8 --- pom.xml | 2 +- .../reader/decode/ZstdEncodingDecoder.java | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index 625eb99c..c8e465f8 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 25 UTF-8 - 0.1 + 0.2 4.3.0 diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java index f60e7d49..47a2cb60 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java @@ -20,12 +20,10 @@ import io.github.dfa1.zstd.ZstdDecompressCtx; import io.github.dfa1.zstd.ZstdDecompressDict; -import io.github.dfa1.zstd.ZstdDictionary; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; /// Read-only decoder for `vortex.zstd`. public final class ZstdEncodingDecoder implements EncodingDecoder { @@ -158,7 +156,9 @@ private static MemorySegment decompressFrames( MemorySegment out = ctx.arena().allocate(totalUncompressed); try (ZstdDecompressCtx dctx = new ZstdDecompressCtx(); Arena scratch = Arena.ofConfined()) { - ZstdDecompressDict dictionary = hasDictionary ? digestDictionary(ctx.buffer(0), meta.dictionary_size()) : null; + ZstdDecompressDict dictionary = hasDictionary + ? digestDictionary(asNative(ctx.buffer(0), scratch), meta.dictionary_size()) + : null; try { long outOffset = 0; for (int i = 0; i < frameCount; i++) { @@ -186,10 +186,9 @@ private static MemorySegment decompressFrames( /// Digests the raw dictionary bytes carried in `dictBuffer` into a reusable native /// decompression dictionary shared by every frame in this segment. /// - /// The one heap copy here is off the hot path: the dictionary is digested once per segment - /// (not per frame or per row) over a small buffer, and `ZSTD_createDDict` re-copies into its - /// own native allocation regardless. Switch to a `MemorySegment` overload once the zstd - /// bindings expose one. + /// Zero-copy: the dictionary buffer (an mmap'd native slice in production) is handed straight + /// to `ZSTD_createDDict`, which copies it into its own native allocation. No heap `byte[]` + /// bounce. /// /// `declaredSize` is the metadata's `dictionary_size`; it must match the dictionary buffer's /// byte size (the Rust reference enforces the same invariant), otherwise the segment is @@ -200,8 +199,7 @@ private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer, lon "dictionary size metadata " + declaredSize + " does not match buffer size " + dictBuffer.byteSize()); } - byte[] raw = dictBuffer.toArray(ValueLayout.JAVA_BYTE); - return new ZstdDecompressDict(ZstdDictionary.of(raw)); + return new ZstdDecompressDict(dictBuffer); } /// Returns `seg` unchanged when it is already native (the production mmap path); otherwise