diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java new file mode 100644 index 00000000..0093497d --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdFfmSpikeTest.java @@ -0,0 +1,144 @@ +package io.github.dfa1.vortex.writer.zstdspike; + +import io.github.dfa1.vortex.core.testing.OhlcData; + +import io.airlift.compress.v3.zstd.ZstdJavaCompressor; +import io.airlift.compress.v3.zstd.ZstdJavaDecompressor; + +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Spike measurement: FFM-bound system `libzstd` vs pure-Java aircompressor, on realistic OHLC +/// column bytes. Validates the two things that decide whether we can drop aircompressor from core: +/// +/// - **Frame compatibility** - native output must decode with aircompressor and vice versa, so the +/// on-disk `vortex.zstd` format is unchanged regardless of which side wrote it. +/// - **Ratio / speed** - whether native is actually the win it should be (configurable levels). +/// +/// Skips when no system libzstd is present (e.g. a minimal CI image). Prints a report to stdout; +/// run with `-pl writer -Dtest=ZstdFfmSpikeTest`. +class ZstdFfmSpikeTest { + + private static final int ROWS = 1_000_000; + + @Test + void nativeAndAircompressorAreFrameCompatible() { + Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found"); + byte[] closeBytes = doubleColumnBytes(); + + // Given — aircompressor (pure-Java) and native, each compressing the same column. + ZstdJavaCompressor airC = new ZstdJavaCompressor(); + byte[] air = aircompress(airC, closeBytes); + byte[] nat = ZstdNative.compress(closeBytes, 3); + + // When / Then — each side decodes the other's frame back to the original bytes. + assertThat(ZstdNative.decompress(air, closeBytes.length)).isEqualTo(closeBytes); + assertThat(airdecompress(nat, closeBytes.length)).isEqualTo(closeBytes); + // And native round-trips itself. + assertThat(ZstdNative.decompress(nat, closeBytes.length)).isEqualTo(closeBytes); + } + + @Test + void report() { + Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found"); + byte[] close = doubleColumnBytes(); + byte[] volume = longColumnBytes(); + + System.out.println("\n=== zstd FFM spike — " + ROWS + " rows ==="); + measure("close (F64)", close); + measure("volume (I64)", volume); + System.out.println(); + } + + private static void measure(String label, byte[] src) { + ZstdJavaCompressor airC = new ZstdJavaCompressor(); + + // Warm up both paths so the JIT / native call sites settle before timing. + for (int i = 0; i < 3; i++) { + aircompress(airC, src); + ZstdNative.compress(src, 3); + ZstdNative.compress(src, 9); + } + + long t0 = System.nanoTime(); + byte[] air = aircompress(airC, src); + long airNs = System.nanoTime() - t0; + + long t1 = System.nanoTime(); + byte[] nat3 = ZstdNative.compress(src, 3); + long nat3Ns = System.nanoTime() - t1; + + long t2 = System.nanoTime(); + byte[] nat9 = ZstdNative.compress(src, 9); + long nat9Ns = System.nanoTime() - t2; + + System.out.printf("%n%s (raw %,d B)%n", label, src.length); + System.out.println(" compress:"); + row("aircompressor (Java)", src.length, air.length, airNs); + row("native level 3", src.length, nat3.length, nat3Ns); + row("native level 9", src.length, nat9.length, nat9Ns); + + // Decompress is the read hot path. Both decode the same level-3 native frame (frame format + // is identical), so this is a like-for-like decode-speed comparison. + for (int i = 0; i < 5; i++) { + airdecompress(nat3, src.length); + ZstdNative.decompress(nat3, src.length); + } + long t3 = System.nanoTime(); + airdecompress(nat3, src.length); + long airDecNs = System.nanoTime() - t3; + long t4 = System.nanoTime(); + ZstdNative.decompress(nat3, src.length); + long natDecNs = System.nanoTime() - t4; + + System.out.println(" decompress (same L3 frame):"); + row("aircompressor (Java)", src.length, nat3.length, airDecNs); + row("native", src.length, nat3.length, natDecNs); + } + + private static void row(String name, int raw, int compressed, long ns) { + double ratio = (double) raw / compressed; + double mbPerSec = (raw / 1_048_576.0) / (ns / 1_000_000_000.0); + System.out.printf(" %-22s %,10d B %5.2fx %7.1f MB/s%n", name, compressed, ratio, mbPerSec); + } + + private static byte[] aircompress(ZstdJavaCompressor c, byte[] src) { + int max = c.maxCompressedLength(src.length); + byte[] dst = new byte[max]; + int n = c.compress(src, 0, src.length, dst, 0, max); + return Arrays.copyOf(dst, n); + } + + private static byte[] airdecompress(byte[] src, int decompressedSize) { + byte[] out = new byte[decompressedSize]; + int n = new ZstdJavaDecompressor().decompress(src, 0, src.length, out, 0, out.length); + return Arrays.copyOf(out, n); + } + + private static byte[] doubleColumnBytes() { + List batches = OhlcData.generate(ROWS, ROWS); + double[] close = batches.getFirst().close(); + ByteBuffer bb = ByteBuffer.allocate(close.length * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (double v : close) { + bb.putDouble(v); + } + return bb.array(); + } + + private static byte[] longColumnBytes() { + List batches = OhlcData.generate(ROWS, ROWS); + long[] volume = batches.getFirst().volume(); + ByteBuffer bb = ByteBuffer.allocate(volume.length * Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (long v : volume) { + bb.putLong(v); + } + return bb.array(); + } +} diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java new file mode 100644 index 00000000..734e8c7e --- /dev/null +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/zstdspike/ZstdNative.java @@ -0,0 +1,117 @@ +package io.github.dfa1.vortex.writer.zstdspike; + +import java.lang.foreign.Arena; +import java.lang.foreign.FunctionDescriptor; +import java.lang.foreign.Linker; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.SymbolLookup; +import java.lang.invoke.MethodHandle; + +import static java.lang.foreign.ValueLayout.ADDRESS; +import static java.lang.foreign.ValueLayout.JAVA_BYTE; +import static java.lang.foreign.ValueLayout.JAVA_INT; +import static java.lang.foreign.ValueLayout.JAVA_LONG; + +/// Spike: a thin FFM binding to the system `libzstd` (no JNI, no `Unsafe`). +/// +/// Binds the four C entry points we need - `ZSTD_compressBound`, `ZSTD_compress`, +/// `ZSTD_decompress`, `ZSTD_isError` - via [Linker] downcalls. The library is resolved from a few +/// well-known paths (override with `-Dzstd.lib.path=...`); [#available()] reports whether it loaded +/// so callers can fall back to the pure-Java path when no native lib is present. +/// +/// This is throwaway measurement code - it copies through heap `byte[]` for an apples-to-apples +/// comparison with aircompressor. A real encoder/decoder would work straight on arena segments. +final class ZstdNative { + + private static final Linker LINKER = Linker.nativeLinker(); + private static final SymbolLookup LOOKUP = load(); + + private static final MethodHandle COMPRESS_BOUND = bind( + "ZSTD_compressBound", FunctionDescriptor.of(JAVA_LONG, JAVA_LONG)); + private static final MethodHandle COMPRESS = bind( + "ZSTD_compress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG, JAVA_INT)); + private static final MethodHandle DECOMPRESS = bind( + "ZSTD_decompress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG)); + private static final MethodHandle IS_ERROR = bind( + "ZSTD_isError", FunctionDescriptor.of(JAVA_INT, JAVA_LONG)); + + private ZstdNative() { + } + + static boolean available() { + return LOOKUP != null; + } + + static byte[] compress(byte[] src, int level) { + try (Arena arena = Arena.ofConfined()) { + MemorySegment in = arena.allocate(src.length); + MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length); + long bound = (long) COMPRESS_BOUND.invokeExact((long) src.length); + MemorySegment out = arena.allocate(bound); + long n = (long) COMPRESS.invokeExact(out, bound, in, (long) src.length, level); + checkError(n, "ZSTD_compress"); + byte[] result = new byte[(int) n]; + MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n); + return result; + } catch (Throwable t) { + throw new IllegalStateException("zstd compress failed", t); + } + } + + static byte[] decompress(byte[] src, int decompressedSize) { + try (Arena arena = Arena.ofConfined()) { + MemorySegment in = arena.allocate(src.length); + MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length); + MemorySegment out = arena.allocate(decompressedSize); + long n = (long) DECOMPRESS.invokeExact(out, (long) decompressedSize, in, (long) src.length); + checkError(n, "ZSTD_decompress"); + byte[] result = new byte[(int) n]; + MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n); + return result; + } catch (Throwable t) { + throw new IllegalStateException("zstd decompress failed", t); + } + } + + private static void checkError(long code, String fn) throws Throwable { + int err = (int) IS_ERROR.invokeExact(code); + if (err != 0) { + throw new IllegalStateException(fn + " returned an error (code " + code + ")"); + } + } + + @SuppressWarnings("restricted") // FFM native binding: downcalls into libzstd are this class's purpose + private static MethodHandle bind(String name, FunctionDescriptor descriptor) { + if (LOOKUP == null) { + return null; + } + return LOOKUP.find(name) + .map(addr -> LINKER.downcallHandle(addr, descriptor)) + .orElseThrow(() -> new IllegalStateException("libzstd missing symbol " + name)); + } + + @SuppressWarnings("restricted") // FFM native binding: loading libzstd is this class's purpose + private static SymbolLookup load() { + String[] candidates = { + System.getProperty("zstd.lib.path"), + "/opt/homebrew/lib/libzstd.dylib", + "/usr/local/lib/libzstd.dylib", + "/usr/lib/x86_64-linux-gnu/libzstd.so.1", + "/lib/x86_64-linux-gnu/libzstd.so.1", + "/usr/lib/aarch64-linux-gnu/libzstd.so.1", + "libzstd.so.1", + "zstd" + }; + for (String candidate : candidates) { + if (candidate == null) { + continue; + } + try { + return SymbolLookup.libraryLookup(candidate, Arena.global()); + } catch (IllegalArgumentException _) { + // Not at this path / not loadable; try the next candidate. + } + } + return null; + } +}