Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@
- [ ] Create website
- build something like hardwood.dev but for vortex files

## Testing

- [ ] **Finish OHLC test-data dedup** — the random-walk generator is single-sourced in
`core.testing.OhlcData` (core test-jar). `integration`'s `OhlcGenerator` is now a thin adapter
that maps `OhlcData.Batch` to its own `OhlcBatch` (`symbols`/`dates` field names) only to avoid
churning the JNI/Arrow callers. Align fully: drop `OhlcGenerator`/`OhlcBatch`, switch the
integration callers (`FileSizeComparisonIntegrationTest`, `JavaWritesRustReadsIntegrationTest`)
to `OhlcData.Batch` directly so there is one shape, not two. Verify with the JNI integration
suite (needs vortex-jni native libs).

## Performance

- [ ] **Benchmark publishing** — drop CI workflow, add `bench-publish` script; see [ADR-0006](docs/adr/0006-benchmark-publishing.md).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.github.dfa1.vortex.calcite;

import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/// Branch coverage for [VortexAggregatePushDownRule]: each query either rewrites to a single-row
/// `LogicalValues` (answerable from zone-map stats) or is left with its `Aggregate`/`TableScan`
/// intact (the rule must abandon — wrong stats would give a wrong answer).
class AggregateRuleBranchTest {

private static final int ROWS = 30_000;
private static final int CHUNK = 10_000;

@TempDir
static Path tmp;
private static SchemaPlus schema;

@BeforeAll
static void writeFile() throws Exception {
Path file = tmp.resolve("ohlc.vortex");
OhlcGenerator.write(file, ROWS, CHUNK);
SchemaPlus root = Frameworks.createRootSchema(true);
schema = root.add("vtx", new VortexSchema(Map.of("ohlc", file)));
}

@Test
void countStar_noProjectPath_rewritesToValues() {
// Given a bare COUNT(*) — Aggregate(TableScan), the NO_PROJECT operand with project == null
// When / Then — answered from the footer row count
assertThat(optimize("select count(*) from ohlc")).contains("LogicalValues").doesNotContain("Aggregate");
}

@Test
void countColumn_withProjectPath_rewritesToValues() {
// Given COUNT(volume) — Aggregate(Project(TableScan)); COUNT(col) = rows − nulls from stats
// When / Then
assertThat(optimize("select count(volume) from ohlc")).contains("LogicalValues").doesNotContain("Aggregate");
}

@Test
void sum_hasNoZoneStat_abandonsRewrite() {
// Given SUM(volume) — no SUM zone statistic exists, so evaluate() yields null and the whole
// rewrite is abandoned
// When / Then — the Aggregate survives for the normal scan path
assertThat(optimize("select sum(volume) from ohlc")).contains("Aggregate");
}

@Test
void minOnNonNumericColumn_abandonsRewrite() {
// Given MIN(symbol) over a VARCHAR column — the stat value is non-numeric, numericLiteral
// returns null, the rewrite is abandoned
// When / Then
assertThat(optimize("select min(symbol) from ohlc")).contains("Aggregate");
}

@Test
void minOverComputedExpression_abandonsRewrite() {
// Given MIN(low + 1) — the projected input is an expression, not a bare column ref, so
// resolveColumn returns null and the rewrite is abandoned
// When / Then
assertThat(optimize("select min(low + 1) from ohlc")).contains("Aggregate");
}

@Test
void groupedAggregate_isLeftUntouched() {
// Given a GROUP BY — group count != 0, the rule returns immediately
// When / Then — Aggregate stays
assertThat(optimize("select symbol, max(high) from ohlc group by symbol")).contains("Aggregate");
}

private static String optimize(String sql) {
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(schema)
.parserConfig(SqlParser.config().withUnquotedCasing(Casing.UNCHANGED))
.build();
Planner planner = Frameworks.getPlanner(config);
try {
SqlNode parsed = planner.parse(sql);
RelNode logical = planner.rel(planner.validate(parsed)).rel;
HepProgram program = new HepProgramBuilder()
.addRuleCollection(VortexAggregatePushDownRule.RULES)
.build();
HepPlanner hep = new HepPlanner(program);
hep.setRoot(logical);
return RelOptUtil.toString(hep.findBestExp());
} catch (Exception e) {
throw new IllegalStateException("planning failed for: " + sql, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.github.dfa1.vortex.calcite;

import io.github.dfa1.vortex.core.model.DType;
import io.github.dfa1.vortex.writer.VortexWriter;
import io.github.dfa1.vortex.writer.WriteOptions;

import org.apache.calcite.jdbc.CalciteConnection;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

/// Drives [VortexTable]'s filter push-down (`scan(root, filters, projects)`) through a real Calcite
/// JDBC planner: every supported `WHERE` comparison must be translated into a zone-map [RowFilter]
/// (so the scan can prune chunks) while Calcite still returns the exact rows. Predicates the
/// translator does not understand must be left untouched, not break the query.
class FilterPushDownTest {

// Two chunks of three rows so the pushed RowFilter has chunks to (potentially) prune.
private static final DType.Struct SCHEMA = DType.structBuilder()
.field("i64", DType.I64)
.field("i32", DType.I32)
.field("f64", DType.F64)
.field("s", DType.UTF8)
.field("b", DType.BOOL)
.build();

@TempDir
static Path tmp;
private static Path file;

@BeforeAll
static void write() throws Exception {
file = tmp.resolve("filter.vortex");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var w = VortexWriter.create(ch, SCHEMA, WriteOptions.defaults())) {
w.writeChunk(Map.of(
"i64", new long[]{1000L, 2000L, 3000L},
"i32", new int[]{100, 200, 300},
"f64", new double[]{1.0, 2.0, 3.0},
"s", new String[]{"a", "b", "c"},
"b", new boolean[]{true, false, true}));
w.writeChunk(Map.of(
"i64", new long[]{4000L, 5000L, 6000L},
"i32", new int[]{400, 500, 600},
"f64", new double[]{4.0, 5.0, 6.0},
"s", new String[]{"d", "e", "f"},
"b", new boolean[]{false, true, false}));
}
}

@ParameterizedTest(name = "[{index}] WHERE {0} -> {1} rows")
@CsvSource({
// every comparison kind the RowFilter translator supports, over a Long column
"i64 = 1000, 1", // EQUALS -> RowFilter.Eq
"i64 <> 1000, 5", // NOT_EQUALS -> RowFilter.Neq
"i64 < 3000, 2", // LESS_THAN -> RowFilter.Lt
"i64 <= 3000, 3", // LESS_THAN_EQ -> RowFilter.Lte
"i64 > 4000, 2", // GREATER_THAN -> RowFilter.Gt
"i64 >= 4000, 3", // GREATER_EQ -> RowFilter.Gte
"s = 'a', 1", // Utf8 literal coercion
"f64 > 3.0, 3", // floating literal coercion
// multiple conjuncts arrive as separate filters -> RowFilter.and over the list
"i64 > 1000 AND i32 < 600, 4",
// bare boolean ref is not a RexCall -> not pushed, query still exact
"b, 3",
// comparison on a BOOLEAN column has no zone-map coercion -> not pushed, still exact
"b = true, 3"
})
void whereClauseIsPushedAndRowsStayExact(String where, int expected) throws Exception {
// Given a Calcite JDBC connection over the Vortex file
Properties info = new Properties();
info.setProperty("lex", "JAVA");
try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) {
conn.unwrap(CalciteConnection.class).getRootSchema()
.add("vtx", new VortexSchema(Map.of("data", file)));

// When the filtered query runs (Calcite pushes the predicate into VortexTable.scan)
int rows = 0;
try (Statement st = conn.createStatement();
ResultSet rs = st.executeQuery("select i64 from vtx.data where " + where)) {
while (rs.next()) {
rows++;
}
}

// Then the row count is exact regardless of whether the predicate was pushed
assertThat(rows).isEqualTo(expected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,52 @@ void table_totalRowsAndStats() {
assertThat(table.statsOf("i64")).isNotNull();
}

@Test
void enumerator_resetUnsupportedAndCloseReleasesOpenChunk() {
// Given a scan positioned inside the first chunk
Enumerator<Object[]> en = new VortexTable(file).scan(null, List.of(), null).enumerator();
assertThat(en.moveNext()).isTrue();

// When / Then — reset is unsupported, and close must release the still-open chunk cleanly
assertThatThrownBy(en::reset).isInstanceOf(UnsupportedOperationException.class);
en.close();
}

@Nested
class MissingFile {

// A path that does not exist makes VortexReader.open throw IOException, which every entry
// point wraps as UncheckedIOException — these are the file-open failure branches.
private final VortexTable table = new VortexTable(tmp.resolve("does-not-exist.vortex"));

@Test
void getRowType_wrapsOpenFailure() {
// When / Then
assertThatThrownBy(() -> table.getRowType(new JavaTypeFactoryImpl()))
.isInstanceOf(java.io.UncheckedIOException.class);
}

@Test
void statsOf_wrapsOpenFailure() {
// When / Then
assertThatThrownBy(() -> table.statsOf("i64"))
.isInstanceOf(java.io.UncheckedIOException.class);
}

@Test
void totalRows_wrapsOpenFailure() {
// When / Then
assertThatThrownBy(table::totalRows).isInstanceOf(java.io.UncheckedIOException.class);
}

@Test
void scan_wrapsOpenFailure() {
// When / Then — the enumerator constructor opens the reader and wraps the failure
assertThatThrownBy(() -> table.scan(null, List.of(), null).enumerator())
.isInstanceOf(java.io.UncheckedIOException.class);
}
}

@Nested
class Schema {

Expand Down Expand Up @@ -187,5 +233,38 @@ void floatColumn_sumIsDouble() throws Exception {
assertThat(s.avg()).isEqualTo(2.25);
}
}

@Test
void narrowIntColumn_sumsViaIntArrayIntoLong() throws Exception {
// Given / When — i32 decodes to IntArray, summed into a long (exact)
try (VortexReader reader = VortexReader.open(file, registry())) {
VortexAggregates.Summary s = VortexAggregates.of(reader, "i32");

// Then
assertThat(s.sum()).isInstanceOf(Long.class).isEqualTo(600L); // 100+200+300
}
}

@Test
void floatColumn_sumsViaFloatArrayIntoDouble() throws Exception {
// Given / When — f32 decodes to FloatArray, accumulated into a double
try (VortexReader reader = VortexReader.open(file, registry())) {
VortexAggregates.Summary s = VortexAggregates.of(reader, "f32");

// Then
assertThat(s.sum()).isInstanceOf(Double.class);
assertThat(s.sum().doubleValue()).isEqualTo(7.5); // 1.5+2.5+3.5
}
}

@Test
void nonNumericColumn_throws() throws Exception {
// Given / When / Then — a UTF8 column has no numeric array branch
try (VortexReader reader = VortexReader.open(file, registry())) {
assertThatThrownBy(() -> VortexAggregates.of(reader, "s"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("not a numeric column");
}
}
}
}
Loading
Loading