diff --git a/CHANGELOG.md b/CHANGELOG.md index 593fcc24..93f38d4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `DType.isUnsigned()` — `true` for the unsigned integer primitives (`U8`–`U64`), `false` otherwise. ([#159](https://github.com/dfa1/vortex-java/issues/159)) +### Changed + +- The `vortex.zstd` encoding now compresses and decompresses through `io.github.dfa1.zstd:zstd` (FFM bindings to the native `libzstd`) instead of `io.airlift:aircompressor-v3`. Consumers of `vortex.zstd` must declare the `zstd` dependency plus the `zstd-native-` artifact for their platform (e.g. `zstd-native-osx-aarch64`, `zstd-native-linux-x86_64`). + ### Fixed - Zone-map pruning now compares filter values in the *column's* type domain rather than by the boxed value's type. A predicate whose value is boxed at a different width (e.g. `Integer` on an `I64` column) — or any value on a `U64` column — previously pruned nothing and silently degraded to a full scan; it now prunes correctly (unsigned columns by unsigned order). As part of this, a filter value genuinely incomparable to its column (e.g. a `String` against a numeric column) now raises `VortexException` during the scan instead of silently disabling pruning — a behaviour change for callers that relied on the previous silent full scan. ([#159](https://github.com/dfa1/vortex-java/issues/159)) diff --git a/bom/pom.xml b/bom/pom.xml index ffcf2f22..ed5e7a58 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -18,11 +18,11 @@ + the zstd FFM bindings explicitly; the BOM pins the tested version. --> - io.airlift - aircompressor-v3 - ${aircompressor.version} + io.github.dfa1.zstd + zstd + ${zstd.version} io.github.dfa1.vortex diff --git a/calcite/pom.xml b/calcite/pom.xml index c8073a94..2314aae2 100644 --- a/calcite/pom.xml +++ b/calcite/pom.xml @@ -40,8 +40,8 @@ test - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd test diff --git a/cli/pom.xml b/cli/pom.xml index 1eefe162..9033bc73 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -34,8 +34,8 @@ vortex-inspector - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd test diff --git a/integration/pom.xml b/integration/pom.xml index 0f58d97e..b4b6e5e4 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -19,8 +19,8 @@ - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd test diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 1b2fd5b5..772f0152 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -24,8 +24,8 @@ - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd test diff --git a/parquet/pom.xml b/parquet/pom.xml index ad7fa777..699ecaf2 100644 --- a/parquet/pom.xml +++ b/parquet/pom.xml @@ -35,10 +35,10 @@ vortex-reader test - + - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd test diff --git a/performance/pom.xml b/performance/pom.xml index c6864238..b7e8de92 100644 --- a/performance/pom.xml +++ b/performance/pom.xml @@ -56,8 +56,8 @@ compile - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd diff --git a/pom.xml b/pom.xml index fe735a46..625eb99c 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,8 @@ 25 UTF-8 - 3.6 + 0.1 + 4.3.0 1.0.0.CR2 @@ -190,9 +191,9 @@ ${hardwood.version} - io.airlift - aircompressor-v3 - ${aircompressor.version} + io.github.dfa1.zstd + zstd + ${zstd.version} com.github.luben @@ -435,6 +436,62 @@ + + + zstd-native-osx-aarch64 + + + mac + aarch64 + + + + + io.github.dfa1.zstd + zstd-native-osx-aarch64 + ${zstd.version} + runtime + + + + + zstd-native-linux-x86_64 + + + unix + linux + amd64 + + + + + io.github.dfa1.zstd + zstd-native-linux-x86_64 + ${zstd.version} + runtime + + + + + zstd-native-windows-x86_64 + + + windows + amd64 + + + + + io.github.dfa1.zstd + zstd-native-windows-x86_64 + ${zstd.version} + runtime + + + diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java index 79ca0faa..5effe141 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/ZstdEncodingDecoder.java @@ -18,12 +18,11 @@ import io.github.dfa1.vortex.reader.array.MaterializedShortArray; import io.github.dfa1.vortex.reader.array.VarBinArray; -import io.airlift.compress.v3.zstd.ZstdDecompressor; -import io.airlift.compress.v3.zstd.ZstdJavaDecompressor; +import io.github.dfa1.zstd.ZstdDecompressCtx; import java.io.IOException; +import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; /// Read-only decoder for `vortex.zstd`. public final class ZstdEncodingDecoder implements EncodingDecoder { @@ -52,7 +51,7 @@ public Array decode(DecodeContext ctx) { } if (meta.dictionary_size() != 0) { throw new VortexException(EncodingId.VORTEX_ZSTD, - "dictionary-compressed Zstd segments are not supported (pure-Java decoder)"); + "dictionary-compressed Zstd segments are not supported"); } BoolArray validity = null; @@ -149,25 +148,38 @@ private static MemorySegment decompressFrames( int frameCount, long totalUncompressed ) { + // Zero-copy: decompress each native frame straight into its slice of the arena output, + // no heap byte[] bounce. The mmap'd file buffers are already native; the scratch arena + // only services the heap segments unit tests hand in. MemorySegment out = ctx.arena().allocate(totalUncompressed); - ZstdDecompressor decompressor = new ZstdJavaDecompressor(); - long outOffset = 0; - for (int i = 0; i < frameCount; i++) { - MemorySegment frameSeg = ctx.buffer(i); - byte[] compressed = frameSeg.toArray(ValueLayout.JAVA_BYTE); - int uncompSize = (int) meta.frames().get(i).uncompressed_size(); - byte[] temp = new byte[uncompSize]; - int written = decompressor.decompress(compressed, 0, compressed.length, temp, 0, uncompSize); - if (written != uncompSize) { - throw new VortexException(EncodingId.VORTEX_ZSTD, - "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + try (ZstdDecompressCtx dctx = new ZstdDecompressCtx(); + Arena scratch = Arena.ofConfined()) { + long outOffset = 0; + for (int i = 0; i < frameCount; i++) { + MemorySegment src = asNative(ctx.buffer(i), scratch); + int uncompSize = (int) meta.frames().get(i).uncompressed_size(); + long written = dctx.decompress(out.asSlice(outOffset, uncompSize), src); + if (written != uncompSize) { + throw new VortexException(EncodingId.VORTEX_ZSTD, + "frame " + i + ": expected " + uncompSize + " bytes, got " + written); + } + outOffset += uncompSize; } - MemorySegment.copy(MemorySegment.ofArray(temp), 0, out, outOffset, uncompSize); - outOffset += uncompSize; } return out; } + /// Returns `seg` unchanged when it is already native (the production mmap path); otherwise + /// copies it into `scratch` so the zero-copy native API can read it. + private static MemorySegment asNative(MemorySegment seg, Arena scratch) { + if (seg.isNative()) { + return seg; + } + MemorySegment copy = scratch.allocate(Math.max(seg.byteSize(), 1)); + MemorySegment.copy(seg, 0, copy, 0, seg.byteSize()); + return copy.asSlice(0, seg.byteSize()); + } + private static Array buildArray(DType dtype, long n, MemorySegment decompressed, DecodeContext ctx) { if (dtype instanceof DType.Primitive dt) { return buildPrimitive(dt, n, decompressed); diff --git a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java index fd0a06f3..81b28e23 100644 --- a/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/VortexHttpReaderIT.java @@ -114,9 +114,9 @@ void scan_publishedFixture_decodesAllRows(String fixture) throws Exception { assertThat(totalRows).isGreaterThan(0); } - // The published zstd.vortex fixture is dictionary-compressed; the pure-Java decoder has no - // Zstd dictionary support and must fail fast with a clear message rather than mis-decode. - // Tracked by https://github.com/dfa1/vortex-java/issues/104 (upstream airlift/aircompressor#119). + // The published zstd.vortex fixture is dictionary-compressed; the decoder has no Zstd + // dictionary support and must fail fast with a clear message rather than mis-decode. + // Tracked by https://github.com/dfa1/vortex-java/issues/104. @Test void scan_zstdVortex_rejectsDictionaryCompression() throws Exception { // Given diff --git a/writer/pom.xml b/writer/pom.xml index 7c5b3a9a..8d143d03 100644 --- a/writer/pom.xml +++ b/writer/pom.xml @@ -21,8 +21,8 @@ vortex-core - io.airlift - aircompressor-v3 + io.github.dfa1.zstd + zstd true diff --git a/writer/src/main/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoder.java b/writer/src/main/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoder.java index 8065b376..5126db40 100644 --- a/writer/src/main/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoder.java +++ b/writer/src/main/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoder.java @@ -1,7 +1,6 @@ package io.github.dfa1.vortex.writer.encode; -import io.airlift.compress.v3.zstd.ZstdCompressor; -import io.airlift.compress.v3.zstd.ZstdJavaCompressor; +import io.github.dfa1.zstd.ZstdCompressCtx; import io.github.dfa1.vortex.core.model.DType; import io.github.dfa1.vortex.core.model.PType; import io.github.dfa1.vortex.core.error.VortexException; @@ -14,7 +13,6 @@ import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.List; /// Write-only encoder for `vortex.zstd`. @@ -37,47 +35,49 @@ public boolean accepts(DType dtype) { @Override public EncodeResult encode(DType dtype, Object data, EncodeContext ctx) { if (dtype instanceof DType.Primitive dt) { - return encodePrimitive(dt, data); + return encodePrimitive(dt, data, ctx.arena()); } if (dtype instanceof DType.Utf8 || dtype instanceof DType.Binary) { - return encodeVarBin((String[]) data); + return encodeVarBin((String[]) data, ctx.arena()); } throw new VortexException(EncodingId.VORTEX_ZSTD, "unsupported dtype: " + dtype); } - private static EncodeResult encodePrimitive(DType.Primitive dt, Object data) { - MemorySegment raw = primitiveToLeBytes(dt.ptype(), data, Arena.ofAuto()); + private static EncodeResult encodePrimitive(DType.Primitive dt, Object data, Arena arena) { + MemorySegment raw = primitiveToLeBytes(dt.ptype(), data, arena); long n = primitiveLength(dt.ptype(), data); - byte[] rawBytes = raw.toArray(ValueLayout.JAVA_BYTE); - return buildResult(rawBytes, n); + return buildResult(raw, n, arena); } - private static EncodeResult encodeVarBin(String[] strings) { - byte[] raw = buildLengthPrefixed(strings); - return buildResult(raw, strings.length); + private static EncodeResult encodeVarBin(String[] strings, Arena arena) { + MemorySegment raw = buildLengthPrefixed(strings, arena); + return buildResult(raw, strings.length, arena); } - private static EncodeResult buildResult(byte[] raw, long n) { - byte[] compressed = compress(raw); + private static EncodeResult buildResult(MemorySegment raw, long n, Arena arena) { + // Zero-copy: compress the arena-native raw segment straight into another arena segment, + // no heap byte[] bounce on either side. The compressed slice is owned by the caller arena. + MemorySegment compressed; + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + compressed = cctx.compress(arena, raw); + } byte[] meta = new ProtoZstdMetadata( 0, - java.util.List.of(new ProtoZstdFrameMetadata(raw.length, n)) + List.of(new ProtoZstdFrameMetadata(raw.byteSize(), n)) ).encode(); EncodeNode root = new EncodeNode(EncodingId.VORTEX_ZSTD, MemorySegment.ofArray(meta), new EncodeNode[0], new int[]{0}); - return new EncodeResult(root, List.of(MemorySegment.ofArray(compressed)), null, null); - } - - private static byte[] compress(byte[] input) { - ZstdCompressor compressor = new ZstdJavaCompressor(); - byte[] out = new byte[compressor.maxCompressedLength(input.length)]; - int len = compressor.compress(input, 0, input.length, out, 0, out.length); - return Arrays.copyOf(out, len); + return new EncodeResult(root, List.of(compressed), null, null); } private static MemorySegment primitiveToLeBytes(PType ptype, Object data, Arena arena) { return switch (ptype) { - case I8, U8 -> MemorySegment.ofArray((byte[]) data); + case I8, U8 -> { + byte[] arr = (byte[]) data; + MemorySegment seg = arena.allocate(arr.length); + MemorySegment.copy(arr, 0, seg, ValueLayout.JAVA_BYTE, 0, arr.length); + yield seg; + } case I16, U16, F16 -> { short[] arr = (short[]) data; MemorySegment seg = arena.allocate((long) arr.length * 2, 2); @@ -132,23 +132,21 @@ private static long primitiveLength(PType ptype, Object data) { }; } - private static byte[] buildLengthPrefixed(String[] strings) { + private static MemorySegment buildLengthPrefixed(String[] strings, Arena arena) { int total = 0; byte[][] encoded = new byte[strings.length][]; for (int i = 0; i < strings.length; i++) { encoded[i] = strings[i].getBytes(StandardCharsets.UTF_8); total += 4 + encoded[i].length; } - try (Arena scratch = Arena.ofConfined()) { - MemorySegment seg = scratch.allocate(total > 0 ? total : 1); - long pos = 0; - for (byte[] bytes : encoded) { - seg.set(PTypeIO.LE_INT, pos, bytes.length); - pos += 4; - MemorySegment.copy(MemorySegment.ofArray(bytes), 0, seg, pos, bytes.length); - pos += bytes.length; - } - return seg.asSlice(0, total).toArray(ValueLayout.JAVA_BYTE); + MemorySegment seg = arena.allocate(total > 0 ? total : 1); + long pos = 0; + for (byte[] bytes : encoded) { + seg.set(PTypeIO.LE_INT, pos, bytes.length); + pos += 4; + MemorySegment.copy(MemorySegment.ofArray(bytes), 0, seg, pos, bytes.length); + pos += bytes.length; } + return seg.asSlice(0, total); } } diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java index 0b06499d..e952c2d8 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/ZstdEncodingEncoderTest.java @@ -1,7 +1,6 @@ package io.github.dfa1.vortex.writer.encode; -import io.airlift.compress.v3.zstd.ZstdCompressor; -import io.airlift.compress.v3.zstd.ZstdJavaCompressor; +import io.github.dfa1.zstd.Zstd; import io.github.dfa1.vortex.core.model.DType; import io.github.dfa1.vortex.core.model.PType; import io.github.dfa1.vortex.core.error.VortexException; @@ -29,7 +28,6 @@ import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -146,10 +144,7 @@ private static ArrayNode toArrayNode(EncodeNode enc) { } private static byte[] compress(byte[] input) { - ZstdCompressor compressor = new ZstdJavaCompressor(); - byte[] out = new byte[compressor.maxCompressedLength(input.length)]; - int len = compressor.compress(input, 0, input.length, out, 0, out.length); - return Arrays.copyOf(out, len); + return Zstd.compress(input); } private static byte[] metaNoDict(long[] uncompressedSizes, long[] nValues) {