From 066df0aaf26f10d582d1157550038b136475f6c1 Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Thu, 25 Jun 2026 21:06:28 +0200 Subject: [PATCH] spike(writer): FFM binding to system libzstd vs aircompressor Throwaway exploration (test scope) probing whether core's vortex.zstd codec can move off the unmaintained aircompressor onto native libzstd via FFM downcalls (no JNI, no Unsafe). ZstdNative binds ZSTD_compressBound/compress/decompress/isError through Linker, resolving the system libzstd (-Dzstd.lib.path override; graceful available() check for fallback). ZstdFfmSpikeTest measures ratio/speed on real OHLC column bytes and asserts frame compatibility both directions. Findings (1M rows, libzstd 1.5.7): native and aircompressor decode each other's frames, so the on-disk format is unchanged. Native compress ~2x faster at equal ratio (level 3), with a real ratio knob at higher levels; decompress 1.1-1.4x faster even through the spike's heap bounce. Native also supports dictionaries, which the pure-Java decoder rejects. Co-Authored-By: Claude Opus 4.8 --- .../writer/zstdspike/ZstdFfmSpikeTest.java | 144 ++++++++++++++++++ .../vortex/writer/zstdspike/ZstdNative.java | 117 ++++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java create mode 100644 writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java new file mode 100644 index 00000000..0093497d --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java @@ -0,0 +1,144 @@ +package io.github.dfa1.vortex.writer.zstdspike; + +import io.github.dfa1.vortex.core.testing.OhlcData; + +import io.airlift.compress.v3.zstd.ZstdJavaCompressor; +import io.airlift.compress.v3.zstd.ZstdJavaDecompressor; + +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Spike measurement: FFM-bound system `libzstd` vs pure-Java aircompressor, on realistic OHLC +/// column bytes. Validates the two things that decide whether we can drop aircompressor from core: +/// +/// - **Frame compatibility** - native output must decode with aircompressor and vice versa, so the +/// on-disk `vortex.zstd` format is unchanged regardless of which side wrote it. +/// - **Ratio / speed** - whether native is actually the win it should be (configurable levels). +/// +/// Skips when no system libzstd is present (e.g. a minimal CI image). Prints a report to stdout; +/// run with `-pl writer -Dtest=ZstdFfmSpikeTest`. +class ZstdFfmSpikeTest { + + private static final int ROWS = 1_000_000; + + @Test + void nativeAndAircompressorAreFrameCompatible() { + Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found"); + byte[] closeBytes = doubleColumnBytes(); + + // Given — aircompressor (pure-Java) and native, each compressing the same column. + ZstdJavaCompressor airC = new ZstdJavaCompressor(); + byte[] air = aircompress(airC, closeBytes); + byte[] nat = ZstdNative.compress(closeBytes, 3); + + // When / Then — each side decodes the other's frame back to the original bytes. + assertThat(ZstdNative.decompress(air, closeBytes.length)).isEqualTo(closeBytes); + assertThat(airdecompress(nat, closeBytes.length)).isEqualTo(closeBytes); + // And native round-trips itself. + assertThat(ZstdNative.decompress(nat, closeBytes.length)).isEqualTo(closeBytes); + } + + @Test + void report() { + Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found"); + byte[] close = doubleColumnBytes(); + byte[] volume = longColumnBytes(); + + System.out.println("\n=== zstd FFM spike — " + ROWS + " rows ==="); + measure("close (F64)", close); + measure("volume (I64)", volume); + System.out.println(); + } + + private static void measure(String label, byte[] src) { + ZstdJavaCompressor airC = new ZstdJavaCompressor(); + + // Warm up both paths so the JIT / native call sites settle before timing. + for (int i = 0; i < 3; i++) { + aircompress(airC, src); + ZstdNative.compress(src, 3); + ZstdNative.compress(src, 9); + } + + long t0 = System.nanoTime(); + byte[] air = aircompress(airC, src); + long airNs = System.nanoTime() - t0; + + long t1 = System.nanoTime(); + byte[] nat3 = ZstdNative.compress(src, 3); + long nat3Ns = System.nanoTime() - t1; + + long t2 = System.nanoTime(); + byte[] nat9 = ZstdNative.compress(src, 9); + long nat9Ns = System.nanoTime() - t2; + + System.out.printf("%n%s (raw %,d B)%n", label, src.length); + System.out.println(" compress:"); + row("aircompressor (Java)", src.length, air.length, airNs); + row("native level 3", src.length, nat3.length, nat3Ns); + row("native level 9", src.length, nat9.length, nat9Ns); + + // Decompress is the read hot path. Both decode the same level-3 native frame (frame format + // is identical), so this is a like-for-like decode-speed comparison. + for (int i = 0; i < 5; i++) { + airdecompress(nat3, src.length); + ZstdNative.decompress(nat3, src.length); + } + long t3 = System.nanoTime(); + airdecompress(nat3, src.length); + long airDecNs = System.nanoTime() - t3; + long t4 = System.nanoTime(); + ZstdNative.decompress(nat3, src.length); + long natDecNs = System.nanoTime() - t4; + + System.out.println(" decompress (same L3 frame):"); + row("aircompressor (Java)", src.length, nat3.length, airDecNs); + row("native", src.length, nat3.length, natDecNs); + } + + private static void row(String name, int raw, int compressed, long ns) { + double ratio = (double) raw / compressed; + double mbPerSec = (raw / 1_048_576.0) / (ns / 1_000_000_000.0); + System.out.printf(" %-22s %,10d B %5.2fx %7.1f MB/s%n", name, compressed, ratio, mbPerSec); + } + + private static byte[] aircompress(ZstdJavaCompressor c, byte[] src) { + int max = c.maxCompressedLength(src.length); + byte[] dst = new byte[max]; + int n = c.compress(src, 0, src.length, dst, 0, max); + return Arrays.copyOf(dst, n); + } + + private static byte[] airdecompress(byte[] src, int decompressedSize) { + byte[] out = new byte[decompressedSize]; + int n = new ZstdJavaDecompressor().decompress(src, 0, src.length, out, 0, out.length); + return Arrays.copyOf(out, n); + } + + private static byte[] doubleColumnBytes() { + List batches = OhlcData.generate(ROWS, ROWS); + double[] close = batches.getFirst().close(); + ByteBuffer bb = ByteBuffer.allocate(close.length * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (double v : close) { + bb.putDouble(v); + } + return bb.array(); + } + + private static byte[] longColumnBytes() { + List batches = OhlcData.generate(ROWS, ROWS); + long[] volume = batches.getFirst().volume(); + ByteBuffer bb = ByteBuffer.allocate(volume.length * Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (long v : volume) { + bb.putLong(v); + } + return bb.array(); + } +} diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java new file mode 100644 index 00000000..734e8c7e --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java @@ -0,0 +1,117 @@ +package io.github.dfa1.vortex.writer.zstdspike; + +import java.lang.foreign.Arena; +import java.lang.foreign.FunctionDescriptor; +import java.lang.foreign.Linker; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.SymbolLookup; +import java.lang.invoke.MethodHandle; + +import static java.lang.foreign.ValueLayout.ADDRESS; +import static java.lang.foreign.ValueLayout.JAVA_BYTE; +import static java.lang.foreign.ValueLayout.JAVA_INT; +import static java.lang.foreign.ValueLayout.JAVA_LONG; + +/// Spike: a thin FFM binding to the system `libzstd` (no JNI, no `Unsafe`). +/// +/// Binds the four C entry points we need - `ZSTD_compressBound`, `ZSTD_compress`, +/// `ZSTD_decompress`, `ZSTD_isError` - via [Linker] downcalls. The library is resolved from a few +/// well-known paths (override with `-Dzstd.lib.path=...`); [#available()] reports whether it loaded +/// so callers can fall back to the pure-Java path when no native lib is present. +/// +/// This is throwaway measurement code - it copies through heap `byte[]` for an apples-to-apples +/// comparison with aircompressor. A real encoder/decoder would work straight on arena segments. +final class ZstdNative { + + private static final Linker LINKER = Linker.nativeLinker(); + private static final SymbolLookup LOOKUP = load(); + + private static final MethodHandle COMPRESS_BOUND = bind( + "ZSTD_compressBound", FunctionDescriptor.of(JAVA_LONG, JAVA_LONG)); + private static final MethodHandle COMPRESS = bind( + "ZSTD_compress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG, JAVA_INT)); + private static final MethodHandle DECOMPRESS = bind( + "ZSTD_decompress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG)); + private static final MethodHandle IS_ERROR = bind( + "ZSTD_isError", FunctionDescriptor.of(JAVA_INT, JAVA_LONG)); + + private ZstdNative() { + } + + static boolean available() { + return LOOKUP != null; + } + + static byte[] compress(byte[] src, int level) { + try (Arena arena = Arena.ofConfined()) { + MemorySegment in = arena.allocate(src.length); + MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length); + long bound = (long) COMPRESS_BOUND.invokeExact((long) src.length); + MemorySegment out = arena.allocate(bound); + long n = (long) COMPRESS.invokeExact(out, bound, in, (long) src.length, level); + checkError(n, "ZSTD_compress"); + byte[] result = new byte[(int) n]; + MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n); + return result; + } catch (Throwable t) { + throw new IllegalStateException("zstd compress failed", t); + } + } + + static byte[] decompress(byte[] src, int decompressedSize) { + try (Arena arena = Arena.ofConfined()) { + MemorySegment in = arena.allocate(src.length); + MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length); + MemorySegment out = arena.allocate(decompressedSize); + long n = (long) DECOMPRESS.invokeExact(out, (long) decompressedSize, in, (long) src.length); + checkError(n, "ZSTD_decompress"); + byte[] result = new byte[(int) n]; + MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n); + return result; + } catch (Throwable t) { + throw new IllegalStateException("zstd decompress failed", t); + } + } + + private static void checkError(long code, String fn) throws Throwable { + int err = (int) IS_ERROR.invokeExact(code); + if (err != 0) { + throw new IllegalStateException(fn + " returned an error (code " + code + ")"); + } + } + + @SuppressWarnings("restricted") // FFM native binding: downcalls into libzstd are this class's purpose + private static MethodHandle bind(String name, FunctionDescriptor descriptor) { + if (LOOKUP == null) { + return null; + } + return LOOKUP.find(name) + .map(addr -> LINKER.downcallHandle(addr, descriptor)) + .orElseThrow(() -> new IllegalStateException("libzstd missing symbol " + name)); + } + + @SuppressWarnings("restricted") // FFM native binding: loading libzstd is this class's purpose + private static SymbolLookup load() { + String[] candidates = { + System.getProperty("zstd.lib.path"), + "/opt/homebrew/lib/libzstd.dylib", + "/usr/local/lib/libzstd.dylib", + "/usr/lib/x86_64-linux-gnu/libzstd.so.1", + "/lib/x86_64-linux-gnu/libzstd.so.1", + "/usr/lib/aarch64-linux-gnu/libzstd.so.1", + "libzstd.so.1", + "zstd" + }; + for (String candidate : candidates) { + if (candidate == null) { + continue; + } + try { + return SymbolLookup.libraryLookup(candidate, Arena.global()); + } catch (IllegalArgumentException _) { + // Not at this path / not loadable; try the next candidate. + } + } + return null; + } +}