Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<maven.compiler.release>25</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- production -->
<zstd.version>0.1</zstd.version>
<zstd.version>0.2</zstd.version>


<fastcsv.version>4.3.0</fastcsv.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.github.dfa1.vortex.reader.array.VarBinArray;

import io.github.dfa1.zstd.ZstdDecompressCtx;
import io.github.dfa1.zstd.ZstdDecompressDict;

import java.io.IOException;
import java.lang.foreign.Arena;
Expand Down Expand Up @@ -49,11 +50,6 @@ public Array decode(DecodeContext ctx) {
} catch (IOException e) {
throw new VortexException(EncodingId.VORTEX_ZSTD, "invalid metadata", e);
}
if (meta.dictionary_size() != 0) {
throw new VortexException(EncodingId.VORTEX_ZSTD,
"dictionary-compressed Zstd segments are not supported");
}

BoolArray validity = null;
if (ctx.node().children().length > 0) {
Array validityArray = ctx.decodeChild(0, DType.BOOL, ctx.rowCount());
Expand Down Expand Up @@ -151,24 +147,61 @@ private static MemorySegment decompressFrames(
// Zero-copy: decompress each native frame straight into its slice of the arena output,
// no heap byte[] bounce. The mmap'd file buffers are already native; the scratch arena
// only services the heap segments unit tests hand in.
//
// Buffer layout mirrors the Rust reference: with a shared dictionary, buffer[0] is the
// dictionary and the frames follow at buffer[1..]; without one, the frames start at
// buffer[0]. The frames count is metadata-driven either way.
boolean hasDictionary = meta.dictionary_size() != 0;
int frameBufferBase = hasDictionary ? 1 : 0;
MemorySegment out = ctx.arena().allocate(totalUncompressed);
try (ZstdDecompressCtx dctx = new ZstdDecompressCtx();
Arena scratch = Arena.ofConfined()) {
long outOffset = 0;
for (int i = 0; i < frameCount; i++) {
MemorySegment src = asNative(ctx.buffer(i), scratch);
int uncompSize = (int) meta.frames().get(i).uncompressed_size();
long written = dctx.decompress(out.asSlice(outOffset, uncompSize), src);
if (written != uncompSize) {
throw new VortexException(EncodingId.VORTEX_ZSTD,
"frame " + i + ": expected " + uncompSize + " bytes, got " + written);
ZstdDecompressDict dictionary = hasDictionary
? digestDictionary(asNative(ctx.buffer(0), scratch), meta.dictionary_size())
: null;
try {
long outOffset = 0;
for (int i = 0; i < frameCount; i++) {
MemorySegment src = asNative(ctx.buffer(frameBufferBase + i), scratch);
int uncompSize = (int) meta.frames().get(i).uncompressed_size();
MemorySegment dst = out.asSlice(outOffset, uncompSize);
long written = dictionary == null
? dctx.decompress(dst, src)
: dctx.decompress(dst, src, dictionary);
if (written != uncompSize) {
throw new VortexException(EncodingId.VORTEX_ZSTD,
"frame " + i + ": expected " + uncompSize + " bytes, got " + written);
}
outOffset += uncompSize;
}
} finally {
if (dictionary != null) {
dictionary.close();
}
outOffset += uncompSize;
}
}
return out;
}

/// Digests the raw dictionary bytes carried in `dictBuffer` into a reusable native
/// decompression dictionary shared by every frame in this segment.
///
/// Zero-copy: the dictionary buffer (an mmap'd native slice in production) is handed straight
/// to `ZSTD_createDDict`, which copies it into its own native allocation. No heap `byte[]`
/// bounce.
///
/// `declaredSize` is the metadata's `dictionary_size`; it must match the dictionary buffer's
/// byte size (the Rust reference enforces the same invariant), otherwise the segment is
/// malformed and we fail fast rather than digest a truncated dictionary.
private static ZstdDecompressDict digestDictionary(MemorySegment dictBuffer, long declaredSize) {
if (dictBuffer.byteSize() != declaredSize) {
throw new VortexException(EncodingId.VORTEX_ZSTD,
"dictionary size metadata " + declaredSize
+ " does not match buffer size " + dictBuffer.byteSize());
}
return new ZstdDecompressDict(dictBuffer);
}

/// Returns `seg` unchanged when it is already native (the production mmap path); otherwise
/// copies it into `scratch` so the zero-copy native API can read it.
private static MemorySegment asNative(MemorySegment seg, Arena scratch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.dfa1.vortex.reader;

import io.github.dfa1.vortex.core.model.DType;
import io.github.dfa1.vortex.core.error.VortexException;
import io.github.dfa1.vortex.core.io.VortexFormat;
import io.github.dfa1.vortex.reader.array.ListArray;
import io.github.dfa1.vortex.reader.array.ListViewArray;
Expand All @@ -15,7 +14,6 @@
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/// Integration test: reads real Vortex files from the public S3 compatibility bucket
/// via HTTP Range requests and validates structure + data.
Expand Down Expand Up @@ -91,9 +89,9 @@ void scan_forVortex_decodesAllRows() throws Exception {
"decimal.vortex", "decimal_byte_parts.vortex", "dict.vortex", "fixed_size_list.vortex",
"fsst.vortex", "null.vortex", "pco.vortex", "primitives.vortex", "rle.vortex",
"runend.vortex", "sequence.vortex", "sparse.vortex", "struct_nested.vortex",
"varbin.vortex", "varbinview.vortex", "zigzag.vortex"
// zstd.vortex omitted: dictionary-compressed, unsupported by the pure-Java decoder
// (see scan_zstdVortex_rejectsDictionaryCompression).
"varbin.vortex", "varbinview.vortex", "zigzag.vortex", "zstd.vortex"
// zstd.vortex is dictionary-compressed; decoded against its shared dictionary via the
// native libzstd bindings.
})
void scan_publishedFixture_decodesAllRows(String fixture) throws Exception {
// Given
Expand All @@ -114,29 +112,6 @@ void scan_publishedFixture_decodesAllRows(String fixture) throws Exception {
assertThat(totalRows).isGreaterThan(0);
}

// The published zstd.vortex fixture is dictionary-compressed; the decoder has no Zstd
// dictionary support and must fail fast with a clear message rather than mis-decode.
// Tracked by https://github.com/dfa1/vortex-java/issues/104.
@Test
void scan_zstdVortex_rejectsDictionaryCompression() throws Exception {
// Given
URI uri = BASE.resolve("zstd.vortex");

// When / Then
try (var sut = VortexHttpReader.open(uri);
var iter = sut.scan(ScanOptions.all())) {
assertThatThrownBy(() -> {
while (iter.hasNext()) {
try (var c = iter.next()) {
c.rowCount();
}
}
})
.isInstanceOf(VortexException.class)
.hasMessageContaining("dictionary-compressed Zstd segments are not supported");
}
}

// vortex.masked / vortex.patched / vortex.variant: decoders implemented, but no S3 fixture
// is published (still absent at v0.75.0) — enable this test once fixtures exist upstream.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.github.dfa1.vortex.writer.encode;

import io.github.dfa1.zstd.Zstd;
import io.github.dfa1.zstd.ZstdCompressCtx;
import io.github.dfa1.zstd.ZstdDictionary;
import io.github.dfa1.vortex.core.model.DType;
import io.github.dfa1.vortex.core.model.PType;
import io.github.dfa1.vortex.core.error.VortexException;
Expand Down Expand Up @@ -147,6 +149,12 @@ private static byte[] compress(byte[] input) {
return Zstd.compress(input);
}

private static byte[] compressWithDict(byte[] input, byte[] dict) {
try (ZstdCompressCtx cctx = new ZstdCompressCtx()) {
return cctx.compress(input, ZstdDictionary.of(dict));
}
}

private static byte[] metaNoDict(long[] uncompressedSizes, long[] nValues) {
java.util.List<ProtoZstdFrameMetadata> frames = new java.util.ArrayList<>();
for (int i = 0; i < uncompressedSizes.length; i++) {
Expand Down Expand Up @@ -178,18 +186,45 @@ private static byte[] toLengthPrefixed(String[] strings) {
}

@Test
void decode_withDictionary_throws() {
// Given — metadata with non-zero dictionary_size; pure-Java decoder doesn't support
// dictionary-compressed Zstd (no JNI dependency)
byte[] compressed = compress(toLeBytes(new int[]{1, 2, 3}));
byte[] meta = new ProtoZstdMetadata(256,
java.util.List.of(new ProtoZstdFrameMetadata(12, 3))).encode();
DecodeContext ctx = makeDictCtx(meta, DTypes.I32, 3, new byte[256], compressed);
void decode_withDictionary_roundTrips() {
// Given — a frame compressed against a shared dictionary; metadata carries the
// dictionary size and the dict bytes live in buffer[0], frames in buffer[1..]
// (mirrors the Rust reference layout).
byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8);
int[] values = {1, 2, 3};
byte[] raw = toLeBytes(values);
byte[] compressed = compressWithDict(raw, dict);
byte[] meta = new ProtoZstdMetadata(dict.length,
java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode();
DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed);

// When
IntArray result = (IntArray) DECODER.decode(ctx);

// Then
assertThat(result.length()).isEqualTo(values.length);
for (int i = 0; i < values.length; i++) {
assertThat(result.getInt(i)).as("index %d", i).isEqualTo(values[i]);
}
}

@Test
void decode_withDictionarySizeMismatch_throws() {
// Given — metadata declares a dictionary_size that does not match the dict buffer's
// actual byte size; the decoder must fail fast rather than digest a malformed
// dictionary (the Rust reference enforces the same invariant).
byte[] dict = "common-zstd-dictionary-content-for-test".getBytes(StandardCharsets.UTF_8);
int[] values = {1, 2, 3};
byte[] raw = toLeBytes(values);
byte[] compressed = compressWithDict(raw, dict);
byte[] meta = new ProtoZstdMetadata(dict.length + 1,
java.util.List.of(new ProtoZstdFrameMetadata(raw.length, values.length))).encode();
DecodeContext ctx = makeDictCtx(meta, DTypes.I32, values.length, dict, compressed);

// When / Then
assertThatThrownBy(() -> DECODER.decode(ctx))
.isInstanceOf(VortexException.class)
.hasMessageContaining("dictionary");
.hasMessageContaining("dictionary size metadata");
}

@Test
Expand Down
Loading