diff --git a/pom.xml b/pom.xml index 625eb99c..c8e465f8 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 25 UTF-8 - 0.1 + 0.2 4.3.0 diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java index 5effe141..47a2cb60 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java @@ -19,6 +19,7 @@ import io.github.dfa1.vortex.reader.array.VarBinArray; import io.github.dfa1.zstd.ZstdDecompressCtx; +import io.github.dfa1.zstd.ZstdDecompressDict; import java.io.IOException; import java.lang.foreign.Arena; @@ -49,11 +50,6 @@ public Array decode(DecodeContext ctx) { } catch (IOException e) { throw new VortexException(EncodingId.VORTEX_ZSTD, "invalid metadata", e); } - if (meta.dictionary_size() != 0) { - throw new VortexException(EncodingId.VORTEX_ZSTD, - "dictionary-compressed Zstd segments are not supported"); - } - BoolArray validity = null; if (ctx.node().children().length > 0) { Array validityArray = ctx.decodeChild(0, DType.BOOL, ctx.rowCount()); @@ -151,24 +147,61 @@ private static MemorySegment decompressFrames( // Zero-copy: decompress each native frame straight into its slice of the arena output, // no heap byte[] bounce. The mmap'd file buffers are already native; the scratch arena // only services the heap segments unit tests hand in. + // + // Buffer layout mirrors the Rust reference: with a shared dictionary, buffer[0] is the + // dictionary and the frames follow at buffer[1..]; without one, the frames start at + // buffer[0]. The frames count is metadata-driven either way. + boolean hasDictionary = meta.dictionary_size() != 0; + int frameBufferBase = hasDictionary ? 1 : 0; MemorySegment out = ctx.arena().allocate(totalUncompressed); try (ZstdDecompressCtx dctx = new ZstdDecompressCtx(); Arena scratch = Arena.ofConfined()) { - long outOffset = 0; - for (int i = 0; i < frameCount; i++) { - MemorySegment src = asNative(ctx.buffer(i), scratch); - int uncompSize = (int) meta.frames().get(i).uncompressed_size(); - long written = dctx.decompress(out.asSlice(outOffset, uncompSize), src); - if (written != uncompSize) { - throw new VortexException(EncodingId.VORTEX_ZSTD, - "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + ZstdDecompressDict dictionary = hasDictionary + ? digestDictionary(asNative(ctx.buffer(0), scratch), meta.dictionary_size()) + : null; + try { + long outOffset = 0; + for (int i = 0; i < frameCount; i++) { + MemorySegment src = asNative(ctx.buffer(frameBufferBase + i), scratch); + int uncompSize = (int) meta.frames().get(i).uncompressed_size(); + MemorySegment dst = out.asSlice(outOffset, uncompSize); + long written = dictionary == null + ? dctx.decompress(dst, src) + : dctx.decompress(dst, src, dictionary); + if (written != uncompSize) { + throw new VortexException(EncodingId.VORTEX_ZSTD, + "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + } + outOffset += uncompSize; + } + } finally { + if (dictionary != null) { + dictionary.close(); } - outOffset += uncompSize; } } return out; } + /// Digests the raw dictionary bytes carried in `dictBuffer` into a reusable native + /// decompression dictionary shared by every frame in this segment. + /// + /// Zero-copy: the dictionary buffer (an mmap'd native slice in production) is handed straight + /// to `ZSTD_createDDict`, which copies it into its own native allocation. No heap `byte[]` + /// bounce. + /// + /// `declaredSize` is the metadata's `dictionary_size`; it must match the dictionary buffer's + /// byte size (the Rust reference enforces the same invariant), otherwise the segment is + /// malformed and we fail fast rather than digest a truncated dictionary. + private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer, long declaredSize) { + if (dictBuffer.byteSize() != declaredSize) { + throw new VortexException(EncodingId.VORTEX_ZSTD, + "dictionary size metadata " + declaredSize + + " does not match buffer size " + dictBuffer.byteSize()); + } + return new ZstdDecompressDict(dictBuffer); + } + /// Returns `seg` unchanged when it is already native (the production mmap path); otherwise /// copies it into `scratch` so the zero-copy native API can read it. private static MemorySegment asNative(MemorySegment seg, Arena scratch) { diff --git a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java index 81b28e23..c2a10019 100644 --- a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java @@ -1,7 +1,6 @@ package io.github.dfa1.vortex.reader; import io.github.dfa1.vortex.core.model.DType; -import io.github.dfa1.vortex.core.error.VortexException; import io.github.dfa1.vortex.core.io.VortexFormat; import io.github.dfa1.vortex.reader.array.ListArray; import io.github.dfa1.vortex.reader.array.ListViewArray; @@ -15,7 +14,6 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /// Integration test: reads real Vortex files from the public S3 compatibility bucket /// via HTTP Range requests and validates structure + data. @@ -91,9 +89,9 @@ void scan_forVortex_decodesAllRows() throws Exception { "decimal.vortex", "decimal_byte_parts.vortex", "dict.vortex", "fixed_size_list.vortex", "fsst.vortex", "null.vortex", "pco.vortex", "primitives.vortex", "rle.vortex", "runend.vortex", "sequence.vortex", "sparse.vortex", "struct_nested.vortex", - "varbin.vortex", "varbinview.vortex", "zigzag.vortex" - // zstd.vortex omitted: dictionary-compressed, unsupported by the pure-Java decoder - // (see scan_zstdVortex_rejectsDictionaryCompression). + "varbin.vortex", "varbinview.vortex", "zigzag.vortex", "zstd.vortex" + // zstd.vortex is dictionary-compressed; decoded against its shared dictionary via the + // native libzstd bindings. }) void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { // Given @@ -114,29 +112,6 @@ void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { assertThat(totalRows).isGreaterThan(0); } - // The published zstd.vortex fixture is dictionary-compressed; the decoder has no Zstd - // dictionary support and must fail fast with a clear message rather than mis-decode. - // Tracked by https://github.com/dfa1/vortex-java/issues/104. - @Test - void scan_zstdVortex_rejectsDictionaryCompression() throws Exception { - // Given - URI uri = BASE.resolve("zstd.vortex"); - - // When / Then - try (var sut = VortexHttpReader.open(uri); - var iter = sut.scan(ScanOptions.all())) { - assertThatThrownBy(() -> { - while (iter.hasNext()) { - try (var c = iter.next()) { - c.rowCount(); - } - } - }) - .isInstanceOf(VortexException.class) - .hasMessageContaining("dictionary-compressed Zstd segments are not supported"); - } - } - // vortex.masked / vortex.patched / vortex.variant: decoders implemented, but no S3 fixture // is published (still absent at v0.75.0) — enable this test once fixtures exist upstream. diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java index e952c2d8..591fb8ef 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java @@ -1,6 +1,8 @@ package io.github.dfa1.vortex.writer.encode; import io.github.dfa1.zstd.Zstd; +import io.github.dfa1.zstd.ZstdCompressCtx; +import io.github.dfa1.zstd.ZstdDictionary; import io.github.dfa1.vortex.core.model.DType; import io.github.dfa1.vortex.core.model.PType; import io.github.dfa1.vortex.core.error.VortexException; @@ -147,6 +149,12 @@ private static byte[] compress(byte[] input) { return Zstd.compress(input); } + private static byte[] compressWithDict(byte[] input, byte[] dict) { + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + return cctx.compress(input, ZstdDictionary.of(dict)); + } + } + private static byte[] metaNoDict(long[] uncompressedSizes, long[] nValues) { java.util.List frames = new java.util.ArrayList<>(); for (int i = 0; i < uncompressedSizes.length; i++) { @@ -178,18 +186,45 @@ private static byte[] toLengthPrefixed(String[] strings) { } @Test - void decode_withDictionary_throws() { - // Given — metadata with non-zero dictionary_size; pure-Java decoder doesn't support - // dictionary-compressed Zstd (no JNI dependency) - byte[] compressed = compress(toLeBytes(new int[]{1, 2, 3})); - byte[] meta = new ProtoZstdMetadata(256, - java.util.List.of(new ProtoZstdFrameMetadata(12, 3))).encode(); - DecodeContext ctx = makeDictCtx(meta, DTypes.I32, 3, new byte[256], compressed); + void decode_withDictionary_roundTrips() { + // Given — a frame compressed against a shared dictionary; metadata carries the + // dictionary size and the dict bytes live in buffer[0], frames in buffer[1..] + // (mirrors the Rust reference layout). + byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8); + int[] values = {1, 2, 3}; + byte[] raw = toLeBytes(values); + byte[] compressed = compressWithDict(raw, dict); + byte[] meta = new ProtoZstdMetadata(dict.length, + java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode(); + DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed); + + // When + IntArray result = (IntArray) DECODER.decode(ctx); + + // Then + assertThat(result.length()).isEqualTo(values.length); + for (int i = 0; i < values.length; i++) { + assertThat(result.getInt(i)).as("index %d", i).isEqualTo(values[i]); + } + } + + @Test + void decode_withDictionarySizeMismatch_throws() { + // Given — metadata declares a dictionary_size that does not match the dict buffer's + // actual byte size; the decoder must fail fast rather than digest a malformed + // dictionary (the Rust reference enforces the same invariant). + byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8); + int[] values = {1, 2, 3}; + byte[] raw = toLeBytes(values); + byte[] compressed = compressWithDict(raw, dict); + byte[] meta = new ProtoZstdMetadata(dict.length + 1, + java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode(); + DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed); // When / Then assertThatThrownBy(() -> DECODER.decode(ctx)) .isInstanceOf(VortexException.class) - .hasMessageContaining("dictionary"); + .hasMessageContaining("dictionary size metadata"); } @Test