Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.github.dfa1.vortex.writer.zstdspike;

import io.github.dfa1.vortex.core.testing.OhlcData;

import io.airlift.compress.v3.zstd.ZstdJavaCompressor;
import io.airlift.compress.v3.zstd.ZstdJavaDecompressor;

import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/// Spike measurement: FFM-bound system `libzstd` vs pure-Java aircompressor, on realistic OHLC
/// column bytes. Validates the two things that decide whether we can drop aircompressor from core:
///
/// - **Frame compatibility** - native output must decode with aircompressor and vice versa, so the
/// on-disk `vortex.zstd` format is unchanged regardless of which side wrote it.
/// - **Ratio / speed** - whether native is actually the win it should be (configurable levels).
///
/// Skips when no system libzstd is present (e.g. a minimal CI image). Prints a report to stdout;
/// run with `-pl writer -Dtest=ZstdFfmSpikeTest`.
class ZstdFfmSpikeTest {

private static final int ROWS = 1_000_000;

@Test
void nativeAndAircompressorAreFrameCompatible() {
Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found");
byte[] closeBytes = doubleColumnBytes();

// Given — aircompressor (pure-Java) and native, each compressing the same column.
ZstdJavaCompressor airC = new ZstdJavaCompressor();
byte[] air = aircompress(airC, closeBytes);
byte[] nat = ZstdNative.compress(closeBytes, 3);

// When / Then — each side decodes the other's frame back to the original bytes.
assertThat(ZstdNative.decompress(air, closeBytes.length)).isEqualTo(closeBytes);
assertThat(airdecompress(nat, closeBytes.length)).isEqualTo(closeBytes);
// And native round-trips itself.
assertThat(ZstdNative.decompress(nat, closeBytes.length)).isEqualTo(closeBytes);
}

@Test
void report() {
Assumptions.assumeTrue(ZstdNative.available(), "system libzstd not found");
byte[] close = doubleColumnBytes();
byte[] volume = longColumnBytes();

System.out.println("\n=== zstd FFM spike — " + ROWS + " rows ===");
measure("close (F64)", close);
measure("volume (I64)", volume);
System.out.println();
}

private static void measure(String label, byte[] src) {
ZstdJavaCompressor airC = new ZstdJavaCompressor();

// Warm up both paths so the JIT / native call sites settle before timing.
for (int i = 0; i < 3; i++) {
aircompress(airC, src);
ZstdNative.compress(src, 3);
ZstdNative.compress(src, 9);
}

long t0 = System.nanoTime();
byte[] air = aircompress(airC, src);
long airNs = System.nanoTime() - t0;

long t1 = System.nanoTime();
byte[] nat3 = ZstdNative.compress(src, 3);
long nat3Ns = System.nanoTime() - t1;

long t2 = System.nanoTime();
byte[] nat9 = ZstdNative.compress(src, 9);
long nat9Ns = System.nanoTime() - t2;

System.out.printf("%n%s (raw %,d B)%n", label, src.length);
System.out.println(" compress:");
row("aircompressor (Java)", src.length, air.length, airNs);
row("native level 3", src.length, nat3.length, nat3Ns);
row("native level 9", src.length, nat9.length, nat9Ns);

// Decompress is the read hot path. Both decode the same level-3 native frame (frame format
// is identical), so this is a like-for-like decode-speed comparison.
for (int i = 0; i < 5; i++) {
airdecompress(nat3, src.length);
ZstdNative.decompress(nat3, src.length);
}
long t3 = System.nanoTime();
airdecompress(nat3, src.length);
long airDecNs = System.nanoTime() - t3;
long t4 = System.nanoTime();
ZstdNative.decompress(nat3, src.length);
long natDecNs = System.nanoTime() - t4;

System.out.println(" decompress (same L3 frame):");
row("aircompressor (Java)", src.length, nat3.length, airDecNs);
row("native", src.length, nat3.length, natDecNs);
}

private static void row(String name, int raw, int compressed, long ns) {
double ratio = (double) raw / compressed;
double mbPerSec = (raw / 1_048_576.0) / (ns / 1_000_000_000.0);
System.out.printf(" %-22s %,10d B %5.2fx %7.1f MB/s%n", name, compressed, ratio, mbPerSec);
}

private static byte[] aircompress(ZstdJavaCompressor c, byte[] src) {
int max = c.maxCompressedLength(src.length);
byte[] dst = new byte[max];
int n = c.compress(src, 0, src.length, dst, 0, max);
return Arrays.copyOf(dst, n);
}

private static byte[] airdecompress(byte[] src, int decompressedSize) {
byte[] out = new byte[decompressedSize];
int n = new ZstdJavaDecompressor().decompress(src, 0, src.length, out, 0, out.length);
return Arrays.copyOf(out, n);
}

private static byte[] doubleColumnBytes() {
List<OhlcData.Batch> batches = OhlcData.generate(ROWS, ROWS);
double[] close = batches.getFirst().close();
ByteBuffer bb = ByteBuffer.allocate(close.length * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN);
for (double v : close) {
bb.putDouble(v);
}
return bb.array();
}

private static byte[] longColumnBytes() {
List<OhlcData.Batch> batches = OhlcData.generate(ROWS, ROWS);
long[] volume = batches.getFirst().volume();
ByteBuffer bb = ByteBuffer.allocate(volume.length * Long.BYTES).order(ByteOrder.LITTLE_ENDIAN);
for (long v : volume) {
bb.putLong(v);
}
return bb.array();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.github.dfa1.vortex.writer.zstdspike;

import java.lang.foreign.Arena;
import java.lang.foreign.FunctionDescriptor;
import java.lang.foreign.Linker;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.SymbolLookup;
import java.lang.invoke.MethodHandle;

import static java.lang.foreign.ValueLayout.ADDRESS;
import static java.lang.foreign.ValueLayout.JAVA_BYTE;
import static java.lang.foreign.ValueLayout.JAVA_INT;
import static java.lang.foreign.ValueLayout.JAVA_LONG;

/// Spike: a thin FFM binding to the system `libzstd` (no JNI, no `Unsafe`).
///
/// Binds the four C entry points we need - `ZSTD_compressBound`, `ZSTD_compress`,
/// `ZSTD_decompress`, `ZSTD_isError` - via [Linker] downcalls. The library is resolved from a few
/// well-known paths (override with `-Dzstd.lib.path=...`); [#available()] reports whether it loaded
/// so callers can fall back to the pure-Java path when no native lib is present.
///
/// This is throwaway measurement code - it copies through heap `byte[]` for an apples-to-apples
/// comparison with aircompressor. A real encoder/decoder would work straight on arena segments.
final class ZstdNative {

private static final Linker LINKER = Linker.nativeLinker();
private static final SymbolLookup LOOKUP = load();

private static final MethodHandle COMPRESS_BOUND = bind(
"ZSTD_compressBound", FunctionDescriptor.of(JAVA_LONG, JAVA_LONG));
private static final MethodHandle COMPRESS = bind(
"ZSTD_compress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG, JAVA_INT));
private static final MethodHandle DECOMPRESS = bind(
"ZSTD_decompress", FunctionDescriptor.of(JAVA_LONG, ADDRESS, JAVA_LONG, ADDRESS, JAVA_LONG));
private static final MethodHandle IS_ERROR = bind(
"ZSTD_isError", FunctionDescriptor.of(JAVA_INT, JAVA_LONG));

private ZstdNative() {
}

static boolean available() {
return LOOKUP != null;
}

static byte[] compress(byte[] src, int level) {
try (Arena arena = Arena.ofConfined()) {
MemorySegment in = arena.allocate(src.length);
MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length);
long bound = (long) COMPRESS_BOUND.invokeExact((long) src.length);
MemorySegment out = arena.allocate(bound);
long n = (long) COMPRESS.invokeExact(out, bound, in, (long) src.length, level);
checkError(n, "ZSTD_compress");
byte[] result = new byte[(int) n];
MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n);
return result;
} catch (Throwable t) {
throw new IllegalStateException("zstd compress failed", t);
}
}

static byte[] decompress(byte[] src, int decompressedSize) {
try (Arena arena = Arena.ofConfined()) {
MemorySegment in = arena.allocate(src.length);
MemorySegment.copy(src, 0, in, JAVA_BYTE, 0, src.length);
MemorySegment out = arena.allocate(decompressedSize);
long n = (long) DECOMPRESS.invokeExact(out, (long) decompressedSize, in, (long) src.length);
checkError(n, "ZSTD_decompress");
byte[] result = new byte[(int) n];
MemorySegment.copy(out, JAVA_BYTE, 0, result, 0, (int) n);
return result;
} catch (Throwable t) {
throw new IllegalStateException("zstd decompress failed", t);
}
}

private static void checkError(long code, String fn) throws Throwable {
int err = (int) IS_ERROR.invokeExact(code);
if (err != 0) {
throw new IllegalStateException(fn + " returned an error (code " + code + ")");
}
}

@SuppressWarnings("restricted") // FFM native binding: downcalls into libzstd are this class's purpose
private static MethodHandle bind(String name, FunctionDescriptor descriptor) {
if (LOOKUP == null) {
return null;
}
return LOOKUP.find(name)
.map(addr -> LINKER.downcallHandle(addr, descriptor))
.orElseThrow(() -> new IllegalStateException("libzstd missing symbol " + name));
}

@SuppressWarnings("restricted") // FFM native binding: loading libzstd is this class's purpose
private static SymbolLookup load() {
String[] candidates = {
System.getProperty("zstd.lib.path"),
"/opt/homebrew/lib/libzstd.dylib",
"/usr/local/lib/libzstd.dylib",
"/usr/lib/x86_64-linux-gnu/libzstd.so.1",
"/lib/x86_64-linux-gnu/libzstd.so.1",
"/usr/lib/aarch64-linux-gnu/libzstd.so.1",
"libzstd.so.1",
"zstd"
};
for (String candidate : candidates) {
if (candidate == null) {
continue;
}
try {
return SymbolLookup.libraryLookup(candidate, Arena.global());
} catch (IllegalArgumentException _) {
// Not at this path / not loadable; try the next candidate.
}
}
return null;
}
}
Loading