Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ private static ReadRegistry primitiveRegistry() {
.build();
}

@Test
void writeSegments_are64ByteAligned(@TempDir Path tmp) throws IOException {
// Given a multi-chunk, multi-column file whose encoded buffers are not 64-byte multiples.
// VortexWriter pads before each segment so every buffer starts 64-aligned (Arrow-compatible);
// a broken pad — wrong modulus arithmetic or a skipped writePadding — leaves a segment offset
// off a 64-byte boundary.
WriteOptions opts = new WriteOptions(3, false, 0.90, 0, false, false);
Path file = tmp.resolve("aligned.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, SCHEMA, opts)) {
for (int c = 0; c < 3; c++) {
long[] id = {c * 3L, c * 3L + 1, c * 3L + 2};
double[] value = {c + 0.5, c + 1.5, c + 2.5};
sut.writeChunk(Map.of("id", id, "value", value));
}
}

// When / Then every data segment starts at a 64-byte boundary
try (VortexReader reader = VortexReader.open(file)) {
assertThat(reader.footer().segmentSpecs()).isNotEmpty();
for (var spec : reader.footer().segmentSpecs()) {
assertThat(spec.offset() % 64).as("segment offset %d aligned", spec.offset()).isZero();
}
}
}

// ── writeChunk validation ─────────────────────────────────────────────────

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.github.dfa1.vortex.writer;

import static io.github.dfa1.vortex.core.io.PTypeIO.LE_FLOAT;
import static io.github.dfa1.vortex.core.io.PTypeIO.LE_INT;
import static io.github.dfa1.vortex.core.io.PTypeIO.LE_SHORT;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;

Expand All @@ -9,6 +11,7 @@
import io.github.dfa1.vortex.reader.Layout;
import io.github.dfa1.vortex.reader.SegmentSpec;
import io.github.dfa1.vortex.reader.VortexReader;
import io.github.dfa1.vortex.reader.array.DoubleArray;
import io.github.dfa1.vortex.reader.array.LongArray;
import io.github.dfa1.vortex.reader.array.MaskedArray;
import io.github.dfa1.vortex.reader.array.StructArray;
Expand Down Expand Up @@ -382,6 +385,105 @@ void primitiveDictColumn_emitsNumericMinMaxZoneMapWrappingDict(@TempDir Path tmp
}
}

@Test
void zoneMaps_f64StatsPayloadDecodesPerZoneMinMaxSum(@TempDir Path tmp) throws IOException {
// Given a zone-mapped F64 file (3 zones of 4 rows). Only I64 stats are otherwise covered, so
// the float stat path — scalarDouble plus the F64 arms of statColumn/sumColumn — is untested:
// a decode that returned 0.0 or read the wrong scalar field would slip through. Fractional
// .5 values also make a truncating (int) decode observable.
DType.Struct schema = new DType.Struct(List.of("v"), List.of(DType.F64), false);
WriteOptions opts = new WriteOptions(4, true, 0.90, 0, true, false);
Path file = tmp.resolve("zoned-f64.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
for (int z = 0; z < 3; z++) {
double[] v = new double[4];
for (int i = 0; i < 4; i++) {
v[i] = z * 4.0 + i + 0.5;
}
sut.writeChunk(Map.of("v", v));
}
}

// When / Then min/max/sum per zone decode as exact doubles
try (VortexReader reader = VortexReader.open(file)) {
Layout zonesFlat = reader.layout().children().get(0).children().get(1);
SegmentSpec spec = reader.footer().segmentSpecs().get(zonesFlat.segments().getFirst());
try (Arena arena = Arena.ofConfined()) {
StructArray stats = (StructArray) reader.decodeFlatSegment(spec, f64StatsTableDtype(), 3, arena);
DoubleArray max = (DoubleArray) ((MaskedArray) stats.field("max")).inner();
DoubleArray min = (DoubleArray) ((MaskedArray) stats.field("min")).inner();
DoubleArray sum = (DoubleArray) ((MaskedArray) stats.field("sum")).inner();
for (int z = 0; z < 3; z++) {
double base = z * 4.0;
assertThat(min.getDouble(z)).as("min zone %d", z).isEqualTo(base + 0.5);
assertThat(max.getDouble(z)).as("max zone %d", z).isEqualTo(base + 3.5);
assertThat(sum.getDouble(z)).as("sum zone %d", z)
.isEqualTo((base + 0.5) + (base + 1.5) + (base + 2.5) + (base + 3.5));
}
}
}
}

@ParameterizedTest
@EnumSource(value = PType.class, names = {"I8", "I16", "I32", "F32"})
void zoneMaps_perTypeStatsDecodePerZoneMinMax(PType ptype, @TempDir Path tmp) throws IOException {
// Given a 2-zone column of `ptype`: zone 0 = [0, 1], zone 1 = [2, 3]. everyPrimitiveType only
// checks the layout is zoned; this pins the decoded per-zone MIN/MAX for the I8/I16/I32 and
// F32 statColumn arms (and the f32 scalar field read in scalarDouble), which the I64/F64
// value tests never reach.
DType.Struct schema = new DType.Struct(List.of("v"), List.of(new DType.Primitive(ptype, false)), false);
WriteOptions opts = new WriteOptions(2, true, 0.90, 0, false, false);
Path file = tmp.resolve("ptype-stats-" + ptype + ".vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
sut.writeChunk(Map.of("v", sample(ptype, 0)));
sut.writeChunk(Map.of("v", sample(ptype, 2)));
}

// When / Then min/max per zone match the source
try (VortexReader reader = VortexReader.open(file)) {
Layout zonesFlat = reader.layout().children().get(0).children().get(1);
SegmentSpec spec = reader.footer().segmentSpecs().get(zonesFlat.segments().getFirst());
try (Arena arena = Arena.ofConfined()) {
StructArray stats = (StructArray) reader.decodeFlatSegment(spec, perTypeStatsTableDtype(ptype), 2, arena);
MemorySegment min = ((MaskedArray) stats.field("min")).inner().materialize(arena);
MemorySegment max = ((MaskedArray) stats.field("max")).inner().materialize(arena);
assertThat(readStat(min, ptype, 0)).as("min zone 0").isEqualTo(0.0);
assertThat(readStat(max, ptype, 0)).as("max zone 0").isEqualTo(1.0);
assertThat(readStat(min, ptype, 1)).as("min zone 1").isEqualTo(2.0);
assertThat(readStat(max, ptype, 1)).as("max zone 1").isEqualTo(3.0);
}
}
}

/// Reads the per-zone stat at `idx` from a materialised min/max segment, widened to double for
/// uniform assertion across the fixed-width primitive types.
private static double readStat(MemorySegment seg, PType ptype, int idx) {
return switch (ptype) {
case I8 -> seg.get(ValueLayout.JAVA_BYTE, idx);
case I16 -> seg.get(LE_SHORT, (long) idx * 2);
case I32 -> seg.get(LE_INT, (long) idx * 4);
case F32 -> seg.get(LE_FLOAT, (long) idx * 4);
default -> throw new AssertionError(ptype);
};
}

/// Stats-table dtype for a numeric column of `ptype`: MAX/MIN in the column ptype, SUM widened
/// to i64/u64/f64 per [VortexWriter] `zoneSumDtype`, plus NULL_COUNT.
private static DType.Struct perTypeStatsTableDtype(PType ptype) {
DType minMax = new DType.Primitive(ptype, true);
DType sum = switch (ptype) {
case U8, U16, U32, U64 -> new DType.Primitive(PType.U64, true);
case I8, I16, I32, I64 -> new DType.Primitive(PType.I64, true);
default -> new DType.Primitive(PType.F64, true);
};
return new DType.Struct(
List.of("max", "max_is_truncated", "min", "min_is_truncated", "sum", "null_count"),
List.of(minMax, DType.BOOL, minMax, DType.BOOL, sum, new DType.Primitive(PType.U64, true)),
false);
}

/// Two values starting at `base` in the storage shape for `ptype`.
private static Object sample(PType ptype, int base) {
return switch (ptype) {
Expand Down Expand Up @@ -416,6 +518,16 @@ private static DType.Struct numericStatsTableDtype() {
false);
}

/// Stats-table dtype for a numeric F64 column: MAX, MIN, SUM (nullable F64), NULL_COUNT.
private static DType.Struct f64StatsTableDtype() {
DType nullableF64 = new DType.Primitive(PType.F64, true);
return new DType.Struct(
List.of("max", "max_is_truncated", "min", "min_is_truncated", "sum", "null_count"),
List.of(nullableF64, DType.BOOL, nullableF64, DType.BOOL,
nullableF64, new DType.Primitive(PType.U64, true)),
false);
}

/// Same as [#statsTableDtype()] but with Utf8 (string) MAX/MIN columns.
private static DType.Struct utf8StatsTableDtype() {
DType nullableUtf8 = new DType.Utf8(true);
Expand Down
Loading