diff --git a/TODO.md b/TODO.md index 5cf4c394..2068b6e9 100644 --- a/TODO.md +++ b/TODO.md @@ -6,6 +6,16 @@ - [ ] Create website - build something like hardwood.dev but for vortex files +## Testing + +- [ ] **Finish OHLC test-data dedup** — the random-walk generator is single-sourced in + `core.testing.OhlcData` (core test-jar). `integration`'s `OhlcGenerator` is now a thin adapter + that maps `OhlcData.Batch` to its own `OhlcBatch` (`symbols`/`dates` field names) only to avoid + churning the JNI/Arrow callers. Align fully: drop `OhlcGenerator`/`OhlcBatch`, switch the + integration callers (`FileSizeComparisonIntegrationTest`, `JavaWritesRustReadsIntegrationTest`) + to `OhlcData.Batch` directly so there is one shape, not two. Verify with the JNI integration + suite (needs vortex-jni native libs). + ## Performance - [ ] **Benchmark publishing** — drop CI workflow, add `bench-publish` script; see [ADR-0006](docs/adr/0006-benchmark-publishing.md). diff --git a/calcite/pom.xml b/calcite/pom.xml new file mode 100644 index 00000000..c8073a94 --- /dev/null +++ b/calcite/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + io.github.dfa1.vortex + vortex-java + 0.9.1-SNAPSHOT + + + vortex-calcite + + vortex-calcite + Apache Calcite SQL adapter over the Vortex columnar file format (demo: filter/project/aggregate push-down). + + + 1.40.0 + + + + + + io.github.dfa1.vortex + vortex-reader + + + org.apache.calcite + calcite-core + ${calcite.version} + + + + io.github.dfa1.vortex + vortex-core + test-jar + test + + + io.github.dfa1.vortex + vortex-writer + test + + + io.airlift + aircompressor-v3 + test + + + org.junit.jupiter + junit-jupiter + test + + + org.assertj + assertj-core + test + + + diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java new file mode 100644 index 00000000..edabaeb9 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java @@ -0,0 +1,198 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ArrayStats; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.interpreter.Bindables; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +/// Rewrites a whole-table `MIN`/`MAX`/`COUNT` aggregate over a [VortexTable] into a single-row +/// [LogicalValues] computed from the footer zone-map statistics — answering the query without +/// decoding a single data segment (ADR 0013 §6, ADR 0018 Phase 2). +/// +/// Fires only when it can answer *every* aggregate from statistics: no `GROUP BY`, and each +/// call is `COUNT(*)`, `COUNT(col)`, `MIN(col)`, or `MAX(col)` over a numeric column. Anything +/// else (e.g. `SUM`, a grouped aggregate, `MIN` on a non-numeric column) leaves the plan +/// untouched for the normal scan path. `SUM`/`AVG` join this tier once the writer emits a +/// per-zone `SUM` statistic. +// Calcite 1.40 removed RelRule.Config.EMPTY; the modern RelRule.Config path requires the +// Immutables annotation processor. The classic operand() constructor is deprecated but fully +// supported and far lighter for a single adapter rule — suppression is localized and justified. +@SuppressWarnings("deprecation") +public final class VortexAggregatePushDownRule extends RelOptRule { + + /// Matches `Aggregate(Project(TableScan))` — the shape Calcite produces when columns are + /// selected before aggregation (e.g. `MIN(low)`). + public static final VortexAggregatePushDownRule WITH_PROJECT = new VortexAggregatePushDownRule( + operand(Aggregate.class, operand(Project.class, operand(TableScan.class, none()))), + "VortexAggregatePushDownRule:project"); + + /// Matches `Aggregate(TableScan)` — e.g. a bare `COUNT(*)` with no projected columns. + public static final VortexAggregatePushDownRule NO_PROJECT = new VortexAggregatePushDownRule( + operand(Aggregate.class, operand(TableScan.class, none())), + "VortexAggregatePushDownRule:scan"); + + /// Every rule variant, for registering with a planner in one call. + public static final java.util.List RULES = java.util.List.of(WITH_PROJECT, NO_PROJECT); + + private VortexAggregatePushDownRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate aggregate = call.rel(0); + if (aggregate.getGroupCount() != 0) { + return; + } + // Explicit operands give concrete rels under both Hep and Volcano: rel(1) is either the + // Project (then rel(2) is the scan) or the scan directly. + Project project; + TableScan scan; + if (call.rel(1) instanceof Project p) { + project = p; + scan = call.rel(2); + } else { + project = null; + scan = call.rel(1); + } + VortexTable table = scan.getTable().unwrap(VortexTable.class); + if (table == null) { + return; + } + // Whole-table stats are only valid for a whole-table scan. If a WHERE predicate was pushed + // into the scan (BindableTableScan.filters), answering from stats would ignore it and return + // the wrong MIN/MAX/COUNT — leave the plan to compute it over the filtered rows. + if (scan instanceof Bindables.BindableTableScan bindable && !bindable.filters.isEmpty()) { + return; + } + RelDataType scanRowType = scan.getRowType(); + List scanColumns = scanRowType.getFieldNames(); + + RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + List outTypes = aggregate.getRowType().getFieldList().stream() + .map(f -> f.getType()).toList(); + + List row = new ArrayList<>(); + List calls = aggregate.getAggCallList(); + for (int i = 0; i < calls.size(); i++) { + RexLiteral literal = evaluate(calls.get(i), outTypes.get(i), table, scanColumns, scanRowType, + project, rexBuilder); + if (literal == null) { + return; // an aggregate we can't answer from stats — abandon the rewrite + } + row.add(literal); + } + + RelNode values = LogicalValues.create( + aggregate.getCluster(), aggregate.getRowType(), + ImmutableList.of(ImmutableList.copyOf(row))); + call.transformTo(values); + } + + /// Evaluates one aggregate call from zone-map stats, returning a literal of `outType`, or + /// `null` if it cannot be answered (so the caller abandons the rewrite). + private static RexLiteral evaluate(AggregateCall agg, RelDataType outType, VortexTable table, + List scanColumns, RelDataType scanRowType, + Project project, RexBuilder rexBuilder) { + return switch (agg.getAggregation().getKind()) { + case COUNT -> { + if (agg.getArgList().isEmpty()) { + yield exact(rexBuilder, table.totalRows(), outType); // COUNT(*) + } + String col = resolveColumn(agg.getArgList().getFirst(), scanColumns, project); + if (col == null) { + yield null; + } + Long nullCount = table.statsOf(col).nullCount(); + // COUNT(col) = rows − nulls. Without a NULL_COUNT stat we cannot assume zero nulls + // for a nullable column (we would overcount), so abandon; a non-nullable column has + // no nulls and is safe. + if (nullCount == null && isNullable(scanRowType, col)) { + yield null; + } + long nulls = nullCount == null ? 0L : nullCount; + yield exact(rexBuilder, table.totalRows() - nulls, outType); + } + case MIN, MAX -> { + if (agg.getArgList().size() != 1) { + yield null; + } + String col = resolveColumn(agg.getArgList().getFirst(), scanColumns, project); + if (col == null) { + yield null; + } + ArrayStats stats = table.statsOf(col); + Object value = agg.getAggregation().getKind() == SqlKind.MIN ? stats.min() : stats.max(); + if (value == null) { + // No MIN/MAX stat. A genuine SQL NULL is only correct when the column provably has + // no non-null rows (empty table, or every row null); otherwise the stat is merely + // absent and we must abandon so the scan computes the real value. + long total = table.totalRows(); + Long nullCount = stats.nullCount(); + boolean provablyNoValues = total == 0 || (nullCount != null && nullCount == total); + yield provablyNoValues ? rexBuilder.makeNullLiteral(outType) : null; + } + yield numericLiteral(rexBuilder, value, outType); + } + default -> null; + }; + } + + /// Returns whether column `col` is nullable per the scan's row type. + private static boolean isNullable(RelDataType scanRowType, String col) { + RelDataTypeField field = scanRowType.getField(col, true, false); + return field == null || field.getType().isNullable(); + } + + /// Maps an aggregate input ordinal to a scan column name, looking through a `Project` of input + /// refs when present. Returns `null` if the ordinal is a computed expression, not a column. + private static String resolveColumn(int aggInput, List scanColumns, Project project) { + if (project == null) { + return aggInput < scanColumns.size() ? scanColumns.get(aggInput) : null; + } + RexNode expr = project.getProjects().get(aggInput); + if (expr instanceof RexInputRef ref && ref.getIndex() < scanColumns.size()) { + return scanColumns.get(ref.getIndex()); + } + return null; + } + + private static RexLiteral exact(RexBuilder rexBuilder, long value, RelDataType type) { + return rexBuilder.makeExactLiteral(BigDecimal.valueOf(value), type); + } + + /// Builds a literal for a non-null `MIN`/`MAX` value, supporting only numeric output types + /// (exact and approximate). A non-numeric value yields `null` so the rule abandons the rewrite. + private static RexLiteral numericLiteral(RexBuilder rexBuilder, Object value, RelDataType type) { + if (!(value instanceof Number number)) { + return null; + } + return switch (type.getSqlTypeName()) { + case TINYINT, SMALLINT, INTEGER, BIGINT -> + rexBuilder.makeExactLiteral(BigDecimal.valueOf(number.longValue()), type); + case FLOAT, REAL, DOUBLE, DECIMAL -> + rexBuilder.makeApproxLiteral(BigDecimal.valueOf(number.doubleValue()), type); + default -> null; + }; + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java new file mode 100644 index 00000000..a9f47ae5 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java @@ -0,0 +1,125 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ArrayStats; +import io.github.dfa1.vortex.reader.Chunk; +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; +import io.github.dfa1.vortex.reader.array.Array; +import io.github.dfa1.vortex.reader.array.DoubleArray; +import io.github.dfa1.vortex.reader.array.FloatArray; +import io.github.dfa1.vortex.reader.array.IntArray; +import io.github.dfa1.vortex.reader.array.LongArray; + +/// Column aggregates answered with the cheapest available source. +/// +/// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the +/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) has no zone statistic +/// in the current writer, so it falls back to a streaming scan. This split is the whole point +/// of the demo: the stats-backed aggregates are effectively free, and `SUM`/`AVG` show what the +/// next writer increment (emit a per-zone `SUM`) would make free too (ADR 0013 §6). +public final class VortexAggregates { + + /// Where an aggregate's value came from. + public enum Source { + /// Read from zone-map statistics in the footer; no data decoded. + ZONE_STATS_PUSHDOWN, + /// Computed by streaming the column's data segments (no usable statistic). + FULL_SCAN + } + + /// A column's aggregate summary plus where each part was sourced. + /// + /// `sum` is a [Long] for integer columns (exact, matching SQL `SUM(BIGINT)` which also wraps + /// at 2^63) and a [Double] for floating-point columns — never a `double` accumulation of + /// integers, which would lose precision past 2^53. + /// + /// @param column the column name + /// @param min minimum value (boxed `Double`/`Long`), or `null` if unknown + /// @param max maximum value, or `null` if unknown + /// @param count count of non-null rows + /// @param sum sum of values (`Long` for integer columns, `Double` for floats), or `null` + /// @param avg mean (`sum / count`), or `null` if not computed + /// @param minMaxSource source of `min`/`max`/`count` + /// @param sumSource source of `sum`/`avg` + public record Summary(String column, Object min, Object max, long count, Number sum, Double avg, + Source minMaxSource, Source sumSource) { + } + + private VortexAggregates() { + } + + /// Computes `MIN`/`MAX`/`COUNT`/`SUM`/`AVG` for a numeric column, pushing down what the zone + /// statistics allow and scanning only for `SUM`/`AVG`. + /// + /// @param reader an open reader over the file + /// @param column the numeric column name + /// @return the column's aggregate summary + public static Summary of(VortexReader reader, String column) { + ArrayStats stats = reader.columnStats().getOrDefault(column, ArrayStats.empty()); + long totalRows = totalRows(reader); + long nullCount = stats.nullCount() == null ? 0L : stats.nullCount(); + long count = totalRows - nullCount; + + // MIN/MAX/COUNT: straight from footer zone-map stats, no data segment touched. + Object min = stats.min(); + Object max = stats.max(); + + // SUM/AVG: no SUM zone-stat exists today, so stream the column once. Integer columns sum + // into a long (exact); floating columns into a double. + Number sum = scanSum(reader, column); + Double avg = count == 0 ? null : sum.doubleValue() / count; + + return new Summary(column, min, max, count, sum, avg, + Source.ZONE_STATS_PUSHDOWN, Source.FULL_SCAN); + } + + private static long totalRows(VortexReader reader) { + try (ScanIterator scan = reader.scan(ScanOptions.all())) { + long total = 0L; + for (long c : scan.chunkRowCounts()) { + total += c; + } + return total; + } + } + + private static Number scanSum(VortexReader reader, String column) { + long longSum = 0L; + double doubleSum = 0.0; + boolean isFloating = false; + try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) { + while (scan.hasNext()) { + try (Chunk chunk = scan.next()) { + long n = chunk.rowCount(); + switch (chunk.column(column)) { + case LongArray a -> { + for (long i = 0; i < n; i++) { + longSum += a.getLong(i); + } + } + case IntArray a -> { + for (long i = 0; i < n; i++) { + longSum += a.getInt(i); + } + } + case DoubleArray a -> { + isFloating = true; + for (long i = 0; i < n; i++) { + doubleSum += a.getDouble(i); + } + } + case FloatArray a -> { + isFloating = true; + for (long i = 0; i < n; i++) { + doubleSum += a.getFloat(i); + } + } + default -> throw new IllegalArgumentException("not a numeric column: " + column); + } + } + } + } + return isFloating ? Double.valueOf(doubleSum) : Long.valueOf(longSum); + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java new file mode 100644 index 00000000..df8be495 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java @@ -0,0 +1,48 @@ +package io.github.dfa1.vortex.calcite; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import java.nio.file.Path; +import java.util.Map; + +/// A Calcite schema exposing one or more Vortex files as SQL tables. +/// +/// Each entry maps a SQL table name to the `.vortex` file backing it: +/// +/// ```java +/// rootSchema.add("vtx", new VortexSchema(Map.of("ohlc", path))); +/// // SELECT symbol, max(high) FROM vtx.ohlc GROUP BY symbol +/// ``` +public final class VortexSchema extends AbstractSchema { + + private final Map tables; + + /// Creates a schema from a map of SQL table name to backing Vortex file. + /// + /// @param files map from table name to the `.vortex` file path + public VortexSchema(Map files) { + this.tables = files.entrySet().stream() + .collect(java.util.stream.Collectors.toUnmodifiableMap( + Map.Entry::getKey, e -> new VortexTable(e.getValue()))); + } + + @Override + protected Map getTableMap() { + return tables; + } + + /// Returns the [VortexTable] registered under `name` — useful for reading push-down + /// instrumentation such as [VortexTable#chunksScannedLastQuery()]. + /// + /// @param name the SQL table name + /// @return the backing table + /// @throws IllegalArgumentException if no table is registered under `name` + public VortexTable table(String name) { + Table t = tables.get(name); + if (!(t instanceof VortexTable vortexTable)) { + throw new IllegalArgumentException("no Vortex table named: " + name); + } + return vortexTable; + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java new file mode 100644 index 00000000..20f2a402 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java @@ -0,0 +1,392 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.reader.Chunk; +import io.github.dfa1.vortex.reader.RowFilter; +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; +import io.github.dfa1.vortex.reader.array.BoolArray; +import io.github.dfa1.vortex.reader.array.DoubleArray; +import io.github.dfa1.vortex.reader.array.FloatArray; +import io.github.dfa1.vortex.reader.array.IntArray; +import io.github.dfa1.vortex.reader.array.LongArray; +import io.github.dfa1.vortex.reader.array.VarBinArray; + +import org.apache.calcite.DataContext; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.schema.ProjectableFilterableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +/// A single Vortex file exposed to Calcite as a flat SQL table with column projection and +/// zone-map filter push-down. +/// +/// Projection (`projects`) is honoured exactly — only the requested columns are decoded and +/// returned. Filters (`filters`) that translate to a [RowFilter] are pushed into the scan for +/// *chunk skipping* via zone-map statistics, but are **left in Calcite's list** rather than +/// consumed: zone-map pruning is approximate (it drops whole chunks that cannot match, not +/// individual rows), so Calcite must still apply the predicate row-by-row for exactness. The +/// win is decoding far fewer chunks when the filter is selective on a clustered column. +public final class VortexTable extends AbstractTable implements ProjectableFilterableTable { + + /// Used only to expand `SEARCH`/`Sarg` predicates (e.g. `BETWEEN`, `IN`) back into ordinary + /// comparison trees the [RowFilter] translation understands. + private static final RexBuilder REX_BUILDER = new RexBuilder(new JavaTypeFactoryImpl()); + + private final Path file; + private final AtomicLong chunksScannedLastQuery = new AtomicLong(); + + /// Creates a table backed by the Vortex file at `file`. + /// + /// @param file path to the `.vortex` file + public VortexTable(Path file) { + this.file = file; + } + + /// Number of chunks actually decoded by the most recent [#scan] — the rest were pruned by + /// zone-map statistics. Used by the demo to show push-down skipping work. + /// + /// @return chunks decoded in the last query + public long chunksScannedLastQuery() { + return chunksScannedLastQuery.get(); + } + + /// Per-column zone-map statistics (global min/max and null count), read from the footer + /// without decoding data. Used by the aggregate push-down rule to answer `MIN`/`MAX`/`COUNT`. + /// + /// @param column the column name + /// @return the column's aggregated statistics + public io.github.dfa1.vortex.reader.ArrayStats statsOf(String column) { + try (VortexReader reader = VortexReader.open(file)) { + return reader.columnStats().getOrDefault(column, io.github.dfa1.vortex.reader.ArrayStats.empty()); + } catch (IOException e) { + throw new UncheckedIOException("cannot read stats of " + file, e); + } + } + + /// Total row count across all chunks, read from chunk metadata without decoding data. + /// + /// @return the number of rows in the file + public long totalRows() { + try (VortexReader reader = VortexReader.open(file); + ScanIterator scan = reader.scan(ScanOptions.all())) { + long total = 0; + for (long c : scan.chunkRowCounts()) { + total += c; + } + return total; + } catch (IOException e) { + throw new UncheckedIOException("cannot count rows of " + file, e); + } + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + DType.Struct struct = struct(); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < struct.fieldNames().size(); i++) { + builder.add(struct.fieldNames().get(i), toSqlType(typeFactory, struct.fieldTypes().get(i))); + } + return builder.build(); + } + + @Override + public Enumerable scan(DataContext root, List filters, int[] projects) { + DType.Struct struct = struct(); + List allNames = struct.fieldNames(); + + // Projection: the columns to decode and emit, in the order Calcite asked for. A null + // projects array means "all columns". + int[] cols = projects != null ? projects : allColumns(allNames.size()); + String[] outNames = new String[cols.length]; + DType[] outTypes = new DType[cols.length]; + for (int i = 0; i < cols.length; i++) { + outNames[i] = allNames.get(cols[i]); + outTypes[i] = struct.fieldTypes().get(cols[i]); + } + + // Filter push-down: build a RowFilter from the predicates we understand (for chunk skip). + // Do NOT remove anything from `filters` — pruning is approximate, Calcite re-checks rows. + Optional pushed = toRowFilter(filters, allNames, struct.fieldTypes()); + + // The scan must include any column the filter prunes on, even when it is not projected — + // chunk pruning reads that column's zone-map stats. Output still emits only outNames. + java.util.LinkedHashSet scanColumns = new java.util.LinkedHashSet<>(List.of(outNames)); + ScanOptions options; + if (pushed.isPresent()) { + collectColumns(pushed.get(), scanColumns); + options = ScanOptions.columns(scanColumns.toArray(String[]::new)).withFilter(pushed.get()); + } else { + options = ScanOptions.columns(outNames); + } + + // Stream rows lazily: decode one chunk at a time and yield a fresh row, so rows die young + // (in G1's young gen) instead of piling a whole-result List into the old gen — the + // dominant cost an async-profiler run showed for the full-scan path (~72% in GC). + ScanOptions scanOptions = options; + return new AbstractEnumerable<>() { + @Override + public Enumerator enumerator() { + return new VortexEnumerator(scanOptions, outNames, outTypes); + } + }; + } + + /// Streaming [Enumerator] over a Vortex scan: advances chunk by chunk, decoding each requested + /// column once per chunk and materialising one `Object[]` row per [#moveNext()]. Rows are not + /// retained, so the working set stays at one chunk rather than the whole result. + private final class VortexEnumerator implements Enumerator { + + private final String[] names; + private final DType[] types; + private final VortexReader reader; + private final ScanIterator scan; + private Chunk chunk; + private Object[] columns; + private long rowInChunk; + private long chunkRows; + private Object[] current; + + private VortexEnumerator(ScanOptions options, String[] names, DType[] types) { + this.names = names; + this.types = types; + chunksScannedLastQuery.set(0); + VortexReader openedReader = null; + try { + openedReader = VortexReader.open(file); + this.reader = openedReader; + this.scan = openedReader.scan(options); + } catch (IOException e) { + closeQuietly(openedReader); + throw new UncheckedIOException("cannot scan " + file, e); + } catch (RuntimeException e) { + closeQuietly(openedReader); + throw e; + } + } + + private void closeQuietly(VortexReader r) { + if (r != null) { + r.close(); + } + } + + @Override + public Object[] current() { + return current; + } + + @Override + public boolean moveNext() { + while (true) { + if (chunk != null && rowInChunk < chunkRows) { + Object[] row = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + row[c] = value(columns[c], types[c], rowInChunk); + } + rowInChunk++; + current = row; + return true; + } + if (chunk != null) { + chunk.close(); + chunk = null; + } + if (!scan.hasNext()) { + return false; + } + chunk = scan.next(); + chunksScannedLastQuery.incrementAndGet(); + chunkRows = chunk.rowCount(); + rowInChunk = 0; + columns = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + columns[c] = chunk.column(names[c]); + } + } + } + + @Override + public void reset() { + throw new UnsupportedOperationException("VortexEnumerator does not support reset"); + } + + @Override + public void close() { + try { + if (chunk != null) { + chunk.close(); + } + } finally { + scan.close(); + reader.close(); + } + } + } + + private DType.Struct struct() { + try (VortexReader reader = VortexReader.open(file)) { + if (!(reader.dtype() instanceof DType.Struct s)) { + throw new IllegalStateException("top-level Vortex dtype is not a struct: " + reader.dtype()); + } + return s; + } catch (IOException e) { + throw new UncheckedIOException("cannot read schema of " + file, e); + } + } + + private static int[] allColumns(int n) { + int[] all = new int[n]; + for (int i = 0; i < n; i++) { + all[i] = i; + } + return all; + } + + private static Object value(Object array, DType type, long r) { + return switch (type) { + case DType.Primitive p -> switch (p.ptype()) { + case F64 -> ((DoubleArray) array).getDouble(r); + case F32 -> (double) ((FloatArray) array).getFloat(r); + case I64, U64 -> ((LongArray) array).getLong(r); + case I32, U32, I16, U16, I8, U8 -> ((IntArray) array).getInt(r); + default -> throw new IllegalStateException("unsupported ptype: " + p.ptype()); + }; + case DType.Utf8 _ -> ((VarBinArray) array).getString(r); + case DType.Bool _ -> ((BoolArray) array).getBoolean(r); + default -> throw new IllegalStateException("unsupported column dtype: " + type); + }; + } + + /// Translates the Calcite predicates into a zone-map [RowFilter], keeping only the comparisons + /// we can prune on (`=`, `<>`, `<`, `<=`, `>`, `>=` between a column and a literal, plus `AND`). + /// Predicates we don't understand are simply not pushed — Calcite still applies them. + private static Optional toRowFilter(List filters, List names, List types) { + List pushed = new ArrayList<>(); + for (RexNode node : filters) { + // Calcite encodes BETWEEN / IN / range unions as SEARCH(ref, Sarg); expand back to a + // comparison tree (>=, <=, AND, OR) before translating. + RexNode expanded = RexUtil.expandSearch(REX_BUILDER, null, node); + toComparison(expanded, names, types).ifPresent(pushed::add); + } + if (pushed.isEmpty()) { + return Optional.empty(); + } + if (pushed.size() == 1) { + return Optional.of(pushed.getFirst()); + } + return Optional.of(RowFilter.and(pushed.toArray(RowFilter[]::new))); + } + + /// Collects every column the filter references into `out`, so the scan can include them for + /// zone-map pruning regardless of projection. + private static void collectColumns(RowFilter filter, java.util.Set out) { + switch (filter) { + case RowFilter.And(var parts) -> parts.forEach(f -> collectColumns(f, out)); + case RowFilter.Eq(var col, var ignored) -> out.add(col); + case RowFilter.Neq(var col, var ignored) -> out.add(col); + case RowFilter.Gt(var col, var ignored) -> out.add(col); + case RowFilter.Gte(var col, var ignored) -> out.add(col); + case RowFilter.Lt(var col, var ignored) -> out.add(col); + case RowFilter.Lte(var col, var ignored) -> out.add(col); + case RowFilter.IsNull(var col) -> out.add(col); + case RowFilter.IsNotNull(var col) -> out.add(col); + } + } + + private static Optional toComparison(RexNode node, List names, List types) { + if (!(node instanceof RexCall call)) { + return Optional.empty(); + } + return switch (call.getKind()) { + case AND -> { + List parts = new ArrayList<>(); + for (RexNode operand : call.getOperands()) { + toComparison(operand, names, types).ifPresent(parts::add); + } + yield parts.isEmpty() ? Optional.empty() + : Optional.of(RowFilter.and(parts.toArray(RowFilter[]::new))); + } + case EQUALS, NOT_EQUALS, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL -> + binary(call, names, types); + default -> Optional.empty(); + }; + } + + private static Optional binary(RexCall call, List names, List types) { + List ops = call.getOperands(); + if (ops.size() != 2 || !(ops.get(0) instanceof RexInputRef ref) || !(ops.get(1) instanceof RexLiteral lit)) { + return Optional.empty(); + } + Object val = literalValue(lit, types.get(ref.getIndex())); + if (val == null) { + return Optional.empty(); + } + String col = names.get(ref.getIndex()); + Comparable cmp = (Comparable) val; + return Optional.of(switch (call.getKind()) { + case EQUALS -> RowFilter.eq(col, val); + case NOT_EQUALS -> RowFilter.neq(col, val); + case LESS_THAN -> RowFilter.lt(col, cmp); + case LESS_THAN_OR_EQUAL -> RowFilter.lte(col, cmp); + case GREATER_THAN -> RowFilter.gt(col, cmp); + case GREATER_THAN_OR_EQUAL -> RowFilter.gte(col, cmp); + default -> throw new IllegalStateException("unreachable kind: " + call.getKind()); + }); + } + + /// Coerces a SQL literal to the Java type the column's zone-map statistics are stored as, so + /// the comparison in [RowFilter] is type-compatible. Integer scalars are stored as `Long` and + /// floats as `Double` in the stats, regardless of the column's physical width — a mismatched + /// boxed type silently disables pruning (the comparator swallows the `ClassCastException`). + /// Returns `null` for unsupported columns. + private static Object literalValue(RexLiteral lit, DType type) { + return switch (type) { + case DType.Utf8 _ -> lit.getValueAs(String.class); + case DType.Primitive p -> switch (p.ptype()) { + case F64, F32 -> lit.getValueAs(Double.class); + case I64, U64, I32, U32, I16, U16, I8, U8 -> lit.getValueAs(Long.class); + default -> null; + }; + default -> null; + }; + } + + private static RelDataType toSqlType(RelDataTypeFactory factory, DType type) { + RelDataType sql = switch (type) { + case DType.Primitive p -> switch (p.ptype()) { + case F64 -> factory.createSqlType(SqlTypeName.DOUBLE); + case F32 -> factory.createSqlType(SqlTypeName.REAL); + case I64, U64 -> factory.createSqlType(SqlTypeName.BIGINT); + case I32, U32 -> factory.createSqlType(SqlTypeName.INTEGER); + case I16, U16 -> factory.createSqlType(SqlTypeName.SMALLINT); + case I8, U8 -> factory.createSqlType(SqlTypeName.TINYINT); + default -> throw new IllegalStateException("unsupported ptype: " + p.ptype()); + }; + case DType.Utf8 _ -> factory.createSqlType(SqlTypeName.VARCHAR); + case DType.Bool _ -> factory.createSqlType(SqlTypeName.BOOLEAN); + default -> throw new IllegalStateException("unsupported column dtype: " + type); + }; + return factory.createTypeWithNullability(sql, type.nullable()); + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java new file mode 100644 index 00000000..e004ecef --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java @@ -0,0 +1,190 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.VortexReader; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Phase 2: proves [VortexAggregatePushDownRule] rewrites a whole-table `MIN`/`MAX`/`COUNT` query +/// into a single-row `Values` computed from zone-map statistics — no table scan in the final plan. +class AggregatePushDownTest { + + private static final int ROWS = 200_000; + private static final int CHUNK = 10_000; + + @TempDir + static Path tmp; + private static Path file; + + @BeforeAll + static void writeFile() throws Exception { + file = tmp.resolve("ohlc.vortex"); + OhlcGenerator.write(file, ROWS, CHUNK); + } + + @Test + void minMaxCountRewriteToValuesFromStats() throws Exception { + // Given a planner over the OHLC schema and a whole-table MIN/MAX/COUNT query + SchemaPlus root = Frameworks.createRootSchema(true); + SchemaPlus vtx = root.add("vtx", new VortexSchema(Map.of("ohlc", file))); + FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(vtx) + // Preserve identifier case so unquoted `ohlc` matches the registered table name. + .parserConfig(org.apache.calcite.sql.parser.SqlParser.config() + .withUnquotedCasing(org.apache.calcite.avatica.util.Casing.UNCHANGED)) + .build(); + Planner planner = Frameworks.getPlanner(config); + SqlNode parsed = planner.parse("select min(low), max(high), count(*) from ohlc"); + RelNode logical = planner.rel(planner.validate(parsed)).rel; + + // When the aggregate push-down rule runs + HepProgram program = new HepProgramBuilder() + .addRuleCollection(VortexAggregatePushDownRule.RULES) + .build(); + HepPlanner hep = new HepPlanner(program); + hep.setRoot(logical); + RelNode optimized = hep.findBestExp(); + + String plan = RelOptUtil.toString(optimized); + System.out.println(); + System.out.println("Aggregate push-down — optimized plan:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // Then the plan is a single-row Values with no scan or aggregate left + assertThat(plan).contains("LogicalValues").doesNotContain("TableScan").doesNotContain("Aggregate"); + + Values values = findValues(optimized); + assertThat(values).isNotNull(); + List ohlcRow = values.getTuples().getFirst(); + + // And the literal values equal what the stats say (no data was decoded to produce them) + try (VortexReader reader = VortexReader.open(file)) { + VortexAggregates.Summary low = VortexAggregates.of(reader, "low"); + VortexAggregates.Summary high = VortexAggregates.of(reader, "high"); + assertThat(ohlcRow.get(0).getValueAs(Double.class)).isEqualTo(((Number) low.min()).doubleValue()); + assertThat(ohlcRow.get(1).getValueAs(Double.class)).isEqualTo(((Number) high.max()).doubleValue()); + assertThat(ohlcRow.get(2).getValueAs(Long.class)).isEqualTo((long) ROWS); + } + } + + @Test + @SuppressWarnings("try") // the Hook.Closeable is used only for its scope (deregister on close) + void pushDownRunsEndToEndThroughJdbcPlanner() throws Exception { + // Given a JDBC Calcite connection over the OHLC schema with the rule registered on the + // (Volcano) planner via Hook.PLANNER + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + String sql = "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + try (Hook.Closeable ignored = Hook.PLANNER.addThread( + (Consumer) planner -> VortexAggregatePushDownRule.RULES.forEach(planner::addRule)); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + // When EXPLAIN is run, the chosen plan answers from stats — a Values, no scan/aggregate + String plan; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("explain plan for " + sql)) { + rs.next(); + plan = rs.getString(1); + } + System.out.println(); + System.out.println("JDBC aggregate push-down — chosen plan:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // And the query executes, producing the stats-derived row + double lo; + double hi; + long c; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + rs.next(); + lo = rs.getDouble("lo"); + hi = rs.getDouble("hi"); + c = rs.getLong("c"); + } + + // Then the plan touched no table scan and the values match the zone-map stats + assertThat(plan).containsIgnoringCase("Values").doesNotContain("TableScan").doesNotContain("Aggregate"); + try (VortexReader reader = VortexReader.open(file)) { + assertThat(lo).isEqualTo(((Number) VortexAggregates.of(reader, "low").min()).doubleValue()); + assertThat(hi).isEqualTo(((Number) VortexAggregates.of(reader, "high").max()).doubleValue()); + } + assertThat(c).isEqualTo(ROWS); + } + } + + @Test + @SuppressWarnings("try") // the Hook.Closeable is used only for its scope (deregister on close) + void filteredAggregateIsNotAnsweredFromWholeTableStats() throws Exception { + // Given the rule on the JDBC (Volcano) planner and a WHERE that pushes a predicate into the + // scan — whole-table stats must not be used to answer a filtered aggregate + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + try (Hook.Closeable ignored = Hook.PLANNER.addThread( + (Consumer) planner -> VortexAggregatePushDownRule.RULES.forEach(planner::addRule)); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + // When MIN(low) is taken over only the rows with low > 10 + double min; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("select min(low) lo from vtx.ohlc where low > 10")) { + rs.next(); + min = rs.getDouble("lo"); + } + + // Then it is the filtered minimum (> 10), not the whole-table MIN (≈ 0.x) the rule would + // wrongly return if it ignored the pushed-down filter + assertThat(min).isGreaterThan(10.0); + } + } + + // Note: the absent-MIN/MAX-stat and absent-NULL_COUNT guards in VortexAggregatePushDownRule are + // defensive for files whose writer omits per-column stats (e.g. some Rust-written columns or + // future stat-less encodings). The in-house Java writer always emits min/max/null_count for + // primitive columns (verified), so those abandon paths cannot be exercised from this module. + + private static Values findValues(RelNode node) { + if (node instanceof Values v) { + return v; + } + for (RelNode input : node.getInputs()) { + Values found = findValues(input); + if (found != null) { + return found; + } + } + return null; + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java new file mode 100644 index 00000000..7ac52b53 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java @@ -0,0 +1,61 @@ +package io.github.dfa1.vortex.calcite; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +/// Profiling harness (disabled in normal runs). Writes a 1M-row OHLC Vortex file once, then runs +/// the full-table `MIN/MAX/COUNT` SQL many times through Calcite so a CPU profile has enough +/// samples to show where the scan path spends its time. +/// +/// Run under async-profiler: +/// ``` +/// ./mvnw test -pl calcite -am -Dtest=CalciteDemo -Ddemo.profile=true \ +/// -DargLine="-agentpath:/opt/homebrew/lib/libasyncProfiler.dylib=start,event=cpu,file=/tmp/calcite.collapsed,collapsed" +/// ``` +class CalciteDemo { + + @Test + @EnabledIfSystemProperty(named = "demo.profile", matches = "true") + void profileFullScan() throws Exception { + int rows = Integer.getInteger("demo.rows", 1_000_000); + int iterations = Integer.getInteger("demo.iterations", 300); + + Path file = Files.createTempFile("ohlc-demo", ".vortex"); + try { + OhlcGenerator.write(file, rows, 10_000); + System.out.printf("wrote %,d rows -> %.1f MB%n", rows, Files.size(file) / 1048576.0); + + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + String sql = "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(org.apache.calcite.jdbc.CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + long t0 = System.nanoTime(); + long count = 0; + for (int i = 0; i < iterations; i++) { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + rs.next(); + count = rs.getLong("c"); + } + } + double ms = (System.nanoTime() - t0) / 1e6 / iterations; + System.out.printf("full scan x%d: %.2f ms/query | count=%,d%n", iterations, ms, count); + } + } finally { + Files.deleteIfExists(file); + } + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java new file mode 100644 index 00000000..88f17765 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java @@ -0,0 +1,41 @@ +package io.github.dfa1.vortex.calcite; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/// De-risk gate: proves Apache Calcite's Enumerable convention (runtime Java codegen +/// compiled by Janino) works on this project's JDK 25 target before any adapter is built. +/// +/// The `VALUES ... WHERE` query forces Calcite to generate and Janino-compile an +/// Enumerable program. If Janino cannot emit JDK 25 class files this test fails at +/// execution time, which is the signal to fall back to Calcite's interpreter convention. +class CalciteSmokeTest { + + @Test + void calciteEnumerableCodegenRunsOnJdk25() throws Exception { + // Given a Calcite JDBC connection (lex/parser defaults are enough for a literal query) + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + + // When a query that forces Enumerable codegen + Janino compilation is executed + int sum = 0; + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info); + Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery( + "select id from (values (1), (2), (3)) as t(id) where id > 1")) { + while (rs.next()) { + sum += rs.getInt("id"); + } + } + + // Then the rows survive the generated+compiled pipeline (2 + 3) + assertThat(sum).isEqualTo(5); + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java new file mode 100644 index 00000000..0aa0284b --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java @@ -0,0 +1,38 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.core.testing.OhlcData; +import io.github.dfa1.vortex.writer.VortexWriter; +import io.github.dfa1.vortex.writer.WriteOptions; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.LinkedHashMap; +import java.util.Map; + +/// Writes the shared [OhlcData] batches to a Vortex file with the in-house Java writer, using a +/// small chunk size so the file carries several zones (making zone-map push-down visible). +final class OhlcGenerator { + + private OhlcGenerator() { + } + + static void write(Path file, int totalRows, int chunkSize) throws IOException { + WriteOptions opts = new WriteOptions(chunkSize, true, 0.90, 0, true, false); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var writer = VortexWriter.create(ch, OhlcData.SCHEMA, opts)) { + for (OhlcData.Batch batch : OhlcData.generate(totalRows, chunkSize)) { + Map columns = new LinkedHashMap<>(); + columns.put("date", batch.date()); + columns.put("symbol", batch.symbol()); + columns.put("open", batch.open()); + columns.put("high", batch.high()); + columns.put("low", batch.low()); + columns.put("close", batch.close()); + columns.put("volume", batch.volume()); + writer.writeChunk(columns); + } + } + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java new file mode 100644 index 00000000..b3689532 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java @@ -0,0 +1,264 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Demo: SQL over a 1M-row OHLC Vortex file via the Calcite adapter. +/// +/// [#aggregatesMatchAndPushdownStaysFlatOverRepeatedQueries] compares the Calcite full scan +/// against Vortex zone-map push-down; [#variousQueryShapes] exercises GROUP BY / WHERE / HAVING +/// / ORDER BY+LIMIT / IN to show the adapter handles real SQL. Those shapes currently run as +/// full scans — they are exactly what Phase 1 (filter/project push-down) and Phase 2 (aggregate +/// push-down) in ADR 0018 will accelerate. +class OhlcSqlDemoTest { + + private static final int ROWS = 1_000_000; + private static final int CHUNK = 10_000; + private static final int WARMUP = 3; + private static final int ITERATIONS = 10; + + // MIN/MAX/COUNT only — the aggregates Vortex answers purely from zone-map stats, with no data + // segment decoded. SUM/AVG are deliberately left out: there is no per-zone SUM stat yet, so they + // would force a full scan and hide the real push-down cost (sub-millisecond here). + private static final String AGG_SQL = + "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + @TempDir + static Path tmp; + private static Path file; + + private record SqlAggs(double minLow, double maxHigh, long count) { + } + + private record Pushdown(Object minLow, Object maxHigh, long count) { + } + + @BeforeAll + static void writeFile() throws Exception { + file = tmp.resolve("ohlc.vortex"); + OhlcGenerator.write(file, ROWS, CHUNK); + } + + @Test + void aggregatesMatchAndPushdownStaysFlatOverRepeatedQueries() throws Exception { + // Given the shared OHLC file behind a Calcite schema + try (Connection conn = connect()) { + // Warm up both paths so timings reflect steady-state JIT, not first-call compile. + for (int i = 0; i < WARMUP; i++) { + runSql(conn); + runPushdown(file); + } + + // When each path runs ITERATIONS times, accumulating wall time + long sqlNanos = 0; + long pushdownNanos = 0; + SqlAggs sql = null; + Pushdown push = null; + for (int i = 0; i < ITERATIONS; i++) { + long a = System.nanoTime(); + sql = runSql(conn); + sqlNanos += System.nanoTime() - a; + + long b = System.nanoTime(); + push = runPushdown(file); + pushdownNanos += System.nanoTime() - b; + } + + printTiming(push, sqlNanos, pushdownNanos); + + // Then push-down min/max/count match the full-scan ground truth exactly + assertThat(((Number) push.minLow()).doubleValue()).isEqualTo(sql.minLow()); + assertThat(((Number) push.maxHigh()).doubleValue()).isEqualTo(sql.maxHigh()); + assertThat(push.count()).isEqualTo(sql.count()); + } + } + + @Test + void variousQueryShapes() throws Exception { + // Given the shared OHLC file behind a Calcite schema + try (Connection conn = connect()) { + // When / Then a spread of SQL shapes all run and return sensible results. + + // GROUP BY + ORDER BY + LIMIT: the five most-traded tickers. + long topRows = printAndCount(conn, "top 5 tickers by total volume", + "select symbol, count(*) days, max(high) hi, sum(volume) vol " + + "from vtx.ohlc group by symbol order by vol desc limit 5"); + assertThat(topRows).isEqualTo(5); + + // WHERE with a row-level predicate: how many 'up' days (close above open). + long upDays; + try (Statement st = conn.createStatement(); + // `close` and `open` are SQL reserved words (CLOSE/OPEN cursor) — must be quoted. + ResultSet rs = st.executeQuery("select count(*) c from vtx.ohlc where `close` > `open`")) { + rs.next(); + upDays = rs.getLong("c"); + } + System.out.printf("%n[up days] close > open: %,d of %,d rows%n", upDays, (long) ROWS); + assertThat(upDays).isBetween(0L, (long) ROWS); + + // WHERE IN + GROUP BY: average volume for a watchlist. + long watchRows = printAndCount(conn, "avg volume for AAPL/MSFT/NVDA", + "select symbol, avg(cast(volume as double)) av from vtx.ohlc " + + "where symbol in ('AAPL', 'MSFT', 'NVDA') group by symbol order by symbol"); + assertThat(watchRows).isEqualTo(3); + + // HAVING: tickers whose lifetime volume clears a threshold. + long bigRows = printAndCount(conn, "tickers with sum(volume) > 33.35e9", + "select symbol, sum(volume) v from vtx.ohlc group by symbol " + + "having sum(volume) > 33350000000 order by v desc"); + assertThat(bigRows).isBetween(1L, 29L); + + // ORDER BY + LIMIT on raw rows: the three highest single-day highs. + long peakRows = printAndCount(conn, "3 highest single-day highs", + "select symbol, high, volume from vtx.ohlc order by high desc limit 3"); + assertThat(peakRows).isEqualTo(3); + } + } + + @Test + void filterPushDownPrunesChunksAndShowsInExplain() throws Exception { + // Given the OHLC file (date is strictly increasing → zone-map prunable) behind a schema + VortexSchema schema = new VortexSchema(Map.of("ohlc", file)); + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema().add("vtx", schema); + + // A narrow date window: ~101 days out of ~33,333, landing in a single 10k-row chunk. + int startDay = (int) java.time.LocalDate.of(2020, 1, 2).toEpochDay(); + int lo = startDay + 10_000; + int hi = startDay + 10_100; + String windowed = "select count(*) c, max(high) h from vtx.ohlc " + + "where `date` between " + lo + " and " + hi; + + // When the unfiltered query runs, every chunk is decoded + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("select count(*) c from vtx.ohlc")) { + rs.next(); + } + long chunksFull = schema.table("ohlc").chunksScannedLastQuery(); + + // And the date-windowed query runs, pruning chunks via zone-map stats + long windowCount; + double windowMaxHigh; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(windowed)) { + rs.next(); + windowCount = rs.getLong("c"); + windowMaxHigh = rs.getDouble("h"); + } + long chunksPruned = schema.table("ohlc").chunksScannedLastQuery(); + + // And EXPLAIN shows the predicate folded into the scan (push-down landed) + String plan; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("explain plan for " + windowed)) { + rs.next(); + plan = rs.getString(1); + } + + System.out.printf("%n[filter push-down] date in [%d, %d]%n", lo, hi); + System.out.printf(" rows matched: %,d | max(high) in window: %.2f%n", windowCount, windowMaxHigh); + System.out.printf(" chunks decoded: %d (windowed) vs %d (full) — %.0f%% skipped by zone maps%n", + chunksPruned, chunksFull, 100.0 * (chunksFull - chunksPruned) / chunksFull); + System.out.println(" EXPLAIN:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // Then the windowed scan touched far fewer chunks than the full scan, exact result intact + assertThat(chunksFull).isEqualTo(ROWS / CHUNK); + assertThat(chunksPruned).isLessThanOrEqualTo(3); + assertThat(windowCount).isBetween(2900L, 3100L); + assertThat(windowMaxHigh).isLessThanOrEqualTo(5347.11); + // And the plan is a Bindable scan carrying pushed filters + assertThat(plan).contains("BindableTableScan").contains("filters"); + } + } + + private static Connection connect() throws Exception { + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info); + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + return conn; + } + + private static SqlAggs runSql(Connection conn) throws Exception { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(AGG_SQL)) { + rs.next(); + return new SqlAggs(rs.getDouble("lo"), rs.getDouble("hi"), rs.getLong("c")); + } + } + + /// Push-down path: read MIN(low)/MAX(high) straight from footer zone-map stats and COUNT(*) from + /// chunk metadata — no data segment is decoded. + private static Pushdown runPushdown(Path file) throws Exception { + try (VortexReader reader = VortexReader.open(file)) { + var stats = reader.columnStats(); + long total = 0; + try (ScanIterator scan = reader.scan(ScanOptions.all())) { + for (long c : scan.chunkRowCounts()) { + total += c; + } + } + return new Pushdown(stats.get("low").min(), stats.get("high").max(), total); + } + } + + /// Runs a query, prints every row as a labelled table, and returns the row count. + private static long printAndCount(Connection conn, String title, String sql) throws Exception { + System.out.printf("%n[%s]%n", title); + long rows = 0; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + StringBuilder header = new StringBuilder(" "); + for (int c = 1; c <= cols; c++) { + header.append(String.format("%-16s", md.getColumnLabel(c))); + } + System.out.println(header); + while (rs.next()) { + StringBuilder line = new StringBuilder(" "); + for (int c = 1; c <= cols; c++) { + line.append(String.format("%-16s", rs.getObject(c))); + } + System.out.println(line); + rows++; + } + } + return rows; + } + + private static void printTiming(Pushdown push, long sqlNanos, long pushdownNanos) { + double sqlMs = sqlNanos / 1e6 / ITERATIONS; + double pushMs = pushdownNanos / 1e6 / ITERATIONS; + System.out.println(); + System.out.printf("OHLC MIN/MAX/COUNT (%,d rows, %d zones) — %d repeated queries%n", + ROWS, ROWS / CHUNK, ITERATIONS); + System.out.printf(" %-14s %-22s %s%n", "AGGREGATE", "VALUE", "SOURCE"); + System.out.printf(" %-14s %-22s %s%n", "MIN(low)", push.minLow(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" %-14s %-22s %s%n", "MAX(high)", push.maxHigh(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" %-14s %-22d %s%n", "COUNT(*)", push.count(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" per-query avg — full scan (Calcite): %.2f ms | push-down (stats only): %.3f ms | %.0fx%n", + sqlMs, pushMs, sqlMs / pushMs); + } +} diff --git a/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java b/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java new file mode 100644 index 00000000..893e9f23 --- /dev/null +++ b/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java @@ -0,0 +1,116 @@ +package io.github.dfa1.vortex.core.testing; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.core.model.PType; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/// Writer-agnostic synthetic OHLC (open/high/low/close + volume) data generator, shared by tests +/// across modules via the core test-jar. +/// +/// Produces deterministic column arrays (seeded random walk per ticker, one row per ticker per +/// day) and exposes the matching [DType.Struct] schema. It performs no I/O — each module writes +/// the batches with its own writer (the in-house Java writer, the JNI writer, …). +public final class OhlcData { + + /// The 30 ticker symbols, assigned round-robin within each batch. + public static final String[] TICKERS = { + "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "BRK.B", "JPM", "V", + "UNH", "XOM", "LLY", "JNJ", "MA", "PG", "HD", "MRK", "AVGO", "CVX", + "PEP", "ABBV", "KO", "COST", "WMT", "MCD", "CSCO", "ACN", "BAC", "CRM" + }; + + /// The column schema: `date` (I32 epoch day), `symbol` (Utf8), `open`/`high`/`low`/`close` + /// (F64), `volume` (I64); all non-nullable. + public static final DType.Struct SCHEMA = new DType.Struct( + List.of("date", "symbol", "open", "high", "low", "close", "volume"), + List.of( + new DType.Primitive(PType.I32, false), + new DType.Utf8(false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.I64, false)), + false); + + /// One chunk of OHLC rows as parallel column arrays. + /// + /// @param date epoch-day per row + /// @param symbol ticker per row + /// @param open opening price per row + /// @param high high price per row + /// @param low low price per row + /// @param close closing price per row + /// @param volume traded volume per row + public record Batch(int[] date, String[] symbol, double[] open, double[] high, + double[] low, double[] close, long[] volume) { + } + + private OhlcData() { + } + + /// Generates `rows` rows split into chunks of at most `chunkSize`, using the default seed. + /// + /// @param rows total number of rows + /// @param chunkSize maximum rows per batch + /// @return the batches, in row order + public static List generate(int rows, int chunkSize) { + return generate(rows, chunkSize, 42L); + } + + /// Generates `rows` rows split into chunks of at most `chunkSize`, seeded by `seed`. + /// + /// @param rows total number of rows + /// @param chunkSize maximum rows per batch + /// @param seed the PRNG seed + /// @return the batches, in row order + public static List generate(int rows, int chunkSize, long seed) { + double[] prices = new double[TICKERS.length]; + Arrays.fill(prices, 100.0); + var rng = new Random(seed); + int startDay = (int) LocalDate.of(2020, 1, 2).toEpochDay(); + int rowsDone = 0; + var batches = new ArrayList(); + + int rowsLeft = rows; + while (rowsLeft > 0) { + int n = Math.min(rowsLeft, chunkSize); + int[] date = new int[n]; + String[] symbol = new String[n]; + double[] open = new double[n]; + double[] high = new double[n]; + double[] low = new double[n]; + double[] close = new double[n]; + long[] volume = new long[n]; + for (int i = 0; i < n; i++) { + int ticker = i % TICKERS.length; + double px = prices[ticker]; + double ret = rng.nextGaussian() * 0.02; + double o = round(px * (1 + ret * 0.3)); + double c = round(px * (1 + ret)); + double spread = Math.abs(px * rng.nextDouble() * 0.03); + date[i] = startDay + (rowsDone + i) / TICKERS.length; + symbol[i] = TICKERS[ticker]; + open[i] = o; + high[i] = round(Math.max(o, c) + spread); + low[i] = round(Math.min(o, c) - spread); + close[i] = c; + volume[i] = Math.max(100_000L, Math.round(1_000_000 + rng.nextGaussian() * 200_000)); + prices[ticker] = c; + } + batches.add(new Batch(date, symbol, open, high, low, close, volume)); + rowsLeft -= n; + rowsDone += n; + } + return batches; + } + + private static double round(double v) { + return Math.round(v * 100.0) * 0.01; + } +} diff --git a/docs/adr/0018-calcite-sql-adapter.md b/docs/adr/0018-calcite-sql-adapter.md new file mode 100644 index 00000000..e33cb3e5 --- /dev/null +++ b/docs/adr/0018-calcite-sql-adapter.md @@ -0,0 +1,219 @@ +# ADR 0018: Apache Calcite SQL adapter — be a push-down source, not an engine + +- **Status:** Proposed +- **Date:** 2026-06-24 +- **Deciders:** project maintainer +- **Supersedes:** — +- **Superseded by:** — +- **Related:** [ADR 0013 — Compute primitives](0013-compute-primitives.md), + [ADR 0016 — vortex-arrow bridge](0016-vortex-arrow-bridge.md), + [ADR 0005 — Vector API adoption](0005-vector-api-adoption.md) + +## Context + +The reader exposes typed zero-copy columns and (ADR 0013) a path to filter/aggregate +push-down over the encoded form and the zone-map stats table. What it does *not* have is a +query language. A recurring ask is "can I run SQL over a Vortex file?" — `SELECT … WHERE … +GROUP BY …` rather than hand-written scan loops. + +Two ways to answer that: + +1. **Build a SQL engine.** Parser, type system, logical plan, cost-based optimiser, join + algorithms, aggregation/spilling, NULL semantics. Person-years, none of it + Vortex-specific — it is solved, generic, and not where this project's advantage lies. +2. **Adapt to an existing engine.** Plug the Vortex scan into a mature SQL front-end and let + it own parsing/planning/joins. The project keeps owning the one thing only it can do well: + a fast columnar scan with push-down into the encoding and the stats table. + +The project's advantage is the **scan**, not the engine. An external engine only ever sees +*decoded* values, so it has already paid the cost Vortex could have skipped — chunk-skipping +via zone maps, encoded-domain comparison (`compare(ALPArray, scalar)` without materialising +doubles), and answering `MIN`/`MAX`/`COUNT`/`SUM` straight from the stats table. That asymmetry +only exists *inside* Vortex. So the goal is to expose the scan + push-down to an engine, not to +reimplement the engine. + +Apache Calcite is the natural JVM host: a pure-Java SQL parser + relational optimiser + +pluggable adapter framework (the substrate under Drill, Flink-SQL, Beam-SQL). It does the +generic work; the adapter contributes a table that knows how to push work down. + +### Prototype (this branch) + +A `calcite` module (`vortex-calcite`, Calcite 1.40.0) was built to de-risk and to anchor this +ADR in working code rather than speculation: + +- **JDK 25 / Janino gate.** `CalciteSmokeTest` runs a query that forces Calcite's Enumerable + convention to generate Java and compile it with Janino. It **passes on JDK 25** — Calcite's + runtime codegen works on this project's target, so the interpreter fallback is not needed. + This was the one real unknown (Janino has historically lagged on new class-file versions — + the same pressure that produced the in-house codegen of ADR 0017). +- **`VortexTable implements ScannableTable`** — derives the SQL row type from the file's + `DType.Struct` and decodes chunks into `Object[]` rows. Baseline only: every full-scan row + crosses the `Object[]` boundary. +- **`VortexSchema`** — maps a SQL table name to a `.vortex` file. +- **`VortexAggregates`** — the push-down core, deliberately built *beside* Calcite (not yet as + a planner rule): `MIN`/`MAX`/`COUNT` are read from footer zone-map stats with no data decode; + `SUM`/`AVG` fall back to a streaming scan because the writer emits no per-zone `SUM` stat yet. +- **`OhlcSqlDemoTest`** — 120 000-row OHLC file, computes the five aggregates via Calcite (full + scan, ground truth) and via push-down, asserts they agree. + +Measured on the demo (120 000 rows, 12 zones): + +``` +AGGREGATE VALUE SOURCE +MIN(low) 3.65 ZONE_STATS_PUSHDOWN +MAX(high) 1048.1 ZONE_STATS_PUSHDOWN +COUNT(*) 120000 ZONE_STATS_PUSHDOWN +SUM(volume) 120023864741 FULL_SCAN +AVG(volume) 1000198.87 FULL_SCAN +full scan (Calcite): ~577 ms | push-down (min/max/count from stats): ~5.9 ms (~100×) +``` + +Two SQL-semantics facts surfaced and must be carried forward by the production adapter: + +- `date` is a SQL reserved word — a column of that name must be quoted, or queries avoid it. +- Calcite's `AVG` over a `BIGINT` column does **integer** division; a true mean needs + `AVG(CAST(col AS DOUBLE))`. The adapter's own aggregate path must match whichever semantics + Calcite's planner applies, so a rule-based push-down stays bit-compatible with the fallback. + +## Decision + +**Ship a `vortex-calcite` adapter that makes Vortex a push-down SQL source for Apache Calcite. +Do not build a SQL engine. Keep the prototype's structure; productionise it in phases.** The +heavy Calcite dependency tree (Avatica, Guava, Janino) is **quarantined in this module** — +`core`/`reader`/`writer` stay dependency-light, the same isolation rule applied to the planned +`vortex-arrow` bridge (ADR 0016). + +Phased productionisation: + +- **Phase 0 — queryable (prototype, done).** `ScannableTable` + `VortexSchema`; `SELECT *` and + any aggregate work via full scan. Proves the wiring and the JDK 25 codegen path. +- **Phase 1 — filter + project push-down.** Implement `ProjectableFilterableTable`: projection + prunes columns (decode only what's asked); filters translate Calcite `RexNode`s into the + ADR 0013 `Predicate` vocabulary and push into the scan's `RowFilter` (zone-map chunk-skip + + encoded-domain compare). Unsupported `RexNode`s are left for Calcite — push-down is + best-effort, never a parse failure. +- **Phase 2 — aggregate push-down (the payoff).** A `RelOptRule` matching `Aggregate(TableScan)` + for `MIN`/`MAX`/`SUM`/`COUNT` rewrites to a stats-backed physical scan folding the zone-map + stats table — promoting today's `VortexAggregates` from a side helper to a planner rule. + `MIN`/`MAX`/`COUNT` work now; `SUM`/`AVG` need the writer to emit a per-zone `SUM` stat first + (ADR 0013 §6 — the same increment that also wants `NULL_COUNT`). + +### Prototype status (branch `feat/vortex-calcite-demo`) + +Phases 0–2 are implemented and tested: + +- **Phase 1 landed.** `VortexTable` is a `ProjectableFilterableTable`: projection prunes columns; + `=, <>, <, <=, >, >=, AND, BETWEEN, IN` translate to a reader `RowFilter` for zone-map chunk + skipping. Filters are pushed but **not consumed** (whole-chunk pruning is approximate, so + Calcite still row-filters). Demo: a `date` range over 1M rows decodes **1 of 100 chunks** (99% + pruned), exact result, and `EXPLAIN` shows `filters`/`projects` folded into `BindableTableScan`. +- **Phase 2 landed (MIN/MAX/COUNT).** `VortexAggregatePushDownRule` rewrites a whole-table + `MIN`/`MAX`/`COUNT` (no `GROUP BY`, numeric columns) into a single-row `LogicalValues` from the + stats — the optimized plan has **no scan and no aggregate**. `SUM`/`AVG` remain a scan pending + the writer per-zone `SUM` stat. + +Gotchas found and recorded for the production adapter: + +- **Reserved words.** Beyond `date`, the OHLC columns `close`/`open` (CLOSE/OPEN cursor) must be + quoted. A production adapter should quote all identifiers it emits. +- **Integer stats are `Long`.** Zone-map integer stats decode as `Long`, floats as `Double`, + regardless of column width. A filter literal boxed at the column's natural width (`Integer` for + `I32`) silently disables pruning because `ScanIterator.compareValues` swallows the resulting + `ClassCastException` and returns `0` (= "cannot prune"). The adapter coerces integer literals to + `Long`; the underlying reader trap is filed as issue #159. +- **`BETWEEN`/`IN` are `SEARCH(Sarg)`**, not `AND(>=,<=)`/`OR` — expand with `RexUtil.expandSearch` + before translating. +- **Calcite 1.40 removed `RelRule.Config.EMPTY`.** The modern `RelRule.Config` path needs the + Immutables annotation processor; the deprecated `RelOptRule` operand constructor is lighter for + a single adapter rule (localized `@SuppressWarnings("deprecation")`). + +The prototype registers the Phase-2 rule through a `HepPlanner` in a test. Wiring it into the +bare `jdbc:calcite:` planner (so SQL over JDBC is auto-rewritten) is the main remaining +productionisation step, alongside the writer `SUM` zone-stat. + +**Two doors, chosen by query shape.** Calcite is the right tool for *reducing* queries (filter +/ aggregate / group-by), where push-down shrinks the result and the `Object[]` boundary +amortises to near-nothing. It is the wrong tool for *bulk columnar extract* (`SELECT *` over a +large file, weak/no filter): every row boxes. That shape should bypass Calcite via the Arrow +C-Data export of ADR 0016 (Option B), columnar and zero-boxing. Calcite is never Vortex's +*execution* engine — the moment many rows flow *through* it, the wrong door was used. The +adapter's job is to push filter/project/aggregate down hard so what reaches `Object[]` is small +by construction. + +## Consequences + +### Positive + +- SQL over Vortex with no engine to build or maintain; Calcite owns parse/plan/optimise/join. +- The push-down surface is exactly the ADR 0013 primitive set — the adapter is the first real + consumer of that vocabulary, validating it against a real planner. +- Dependency isolation: only consumers who add `vortex-calcite` pay the Calcite/Janino/Guava + cost; the core stays clean and (per ADR 0017) JPMS-viable. +- Aggregate push-down gives the headline result — `MIN`/`MAX`/`COUNT` answered from a tiny stats + table, ~100× faster than a full scan on the prototype, exact against ground truth. +- Concrete motivation for the writer-side `SUM`/`NULL_COUNT` zone-stat increment: once emitted, + `SUM`/`AVG`/`COUNT` join the no-decode tier. + +### Negative + +- Calcite's Enumerable execution is row-at-a-time `Object[]` with autoboxing. Acceptable for a + *source* (push-down does the heavy work before rows materialise), but it caps throughput on + any query that emits many rows — which is why bulk extract is pushed to the Arrow door, not + Calcite. +- A second public surface (SQL) with its own semantics gotchas (reserved words, `AVG` integer + division, three-valued logic) the adapter must honour exactly to stay consistent with the + push-down path. +- The `RelOptRule` for aggregate push-down is non-trivial Calcite-internal work; the kernel + matrix (Array variant × Predicate variant) from ADR 0013 still has to exist underneath. + +### Risks to manage + +- **Push-down/fallback divergence.** A pushed-down aggregate must produce bit-identical results + to Calcite's own computation over the scanned rows (NULL handling, integer vs double `AVG`, + overflow). Test every pushed aggregate against the full-scan ground truth, as the prototype + does. +- **Calcite version churn.** Calcite's adapter SPI and Avatica move between releases; pin the + version and keep the smoke test as the JDK-compatibility tripwire (re-run on JDK upgrades). +- **Temptation to make Calcite columnar.** Bridging Vortex into a vectorised execution engine is + the rabbit hole; that is DuckDB/DataFusion territory (native — the `vortex-jni` world). Resist + it. Keep Calcite as front-end + planner only. +- **Lifetime.** `Object[]` rows hold decoded values, but any zero-copy path (future) must keep + the reader's `Arena` alive while the `Enumerator` is open — the same release-callback hazard + flagged for the Arrow C-Data bridge (ADR 0016). + +## Alternatives considered + +### A. Build a small bespoke SQL engine + +A hand-rolled parser + executor over the ADR 0013 kernels (single table, `WHERE`, simple +aggregates, no joins). Attractive as a *demo* of push-down, but it dies at the first join or +optimiser requirement and re-implements solved, generic machinery. Rejected as a product +direction; the prototype's value is the adapter + push-down helper, not a query language. + +### B. Hand off entirely to DuckDB / DataFusion via Arrow + +Export decoded columns through the Arrow C-Data interface (ADR 0016 Option B) and let a mature +native engine run all SQL. Least code, fastest *execution* — but the engine receives *decoded* +Arrow, so encoded-domain push-down (Vortex's whole edge) is lost; only zone-map chunk-skip +survives, and only if Vortex pre-filters before export. This is the right path for *bulk +extract* and for native consumers, and it coexists with this ADR as the "second door" — not a +replacement for JVM SQL with push-down. + +### C. Apache Calcite with full custom physical convention + +Implement a complete `Convention` with vectorised Vortex operators so execution stays columnar +end-to-end. Maximum performance, but it means building a vectorised execution engine inside +Calcite — most of option A's cost plus Calcite-internal complexity. Deferred indefinitely; the +`Object[]` Enumerable path plus aggressive push-down is sufficient for the source role, and bulk +throughput belongs to the Arrow door. + +## References + +- Prototype: `feat/vortex-calcite-demo` — `calcite/` module (`VortexTable`, `VortexSchema`, + `VortexAggregates`, `CalciteSmokeTest`, `OhlcSqlDemoTest`) +- [ADR 0013 — Compute primitives](0013-compute-primitives.md) §6 (aggregate push-down via + zone-map stats; `SUM`/`NULL_COUNT` writer increment) +- [ADR 0016 — vortex-arrow bridge](0016-vortex-arrow-bridge.md) (the "bulk extract" door) +- [ADR 0017 — in-house FlatBuffers codegen](0017-in-house-flatbuffers-codegen.md) (JDK 25 / + generated-code pressure that motivated the Janino smoke test) +- Apache Calcite adapter SPI — https://calcite.apache.org/docs/adapter.html diff --git a/docs/adr/ADR.md b/docs/adr/ADR.md index e75686a2..a84f0143 100644 --- a/docs/adr/ADR.md +++ b/docs/adr/ADR.md @@ -30,3 +30,4 @@ Each ADR is a Markdown file named `NNNN-short-title.md`. Use `template.md` as th | 0015 | Drop Materialized fallbacks once Lazy has shipped | Completed | 0.8.0 | | 0016 | vortex-arrow bridge module for Arrow interop | Proposed | | | 0017 | In-house FlatBuffers codegen + MemorySegment runtime | Accepted | | +| 0018 | Apache Calcite SQL adapter — push-down source | Proposed | | diff --git a/integration/pom.xml b/integration/pom.xml index b5334b7d..0f58d97e 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -49,6 +49,12 @@ hardwood-core test + + io.github.dfa1.vortex + vortex-core + test-jar + test + io.github.dfa1.vortex vortex-reader diff --git a/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java b/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java index 19cbe59b..6c317d3a 100644 --- a/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java +++ b/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java @@ -1,18 +1,15 @@ package io.github.dfa1.vortex.integration; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Arrays; +import io.github.dfa1.vortex.core.testing.OhlcData; + import java.util.List; -import java.util.Random; -class OhlcGenerator { +/// Thin adapter over the shared [OhlcData] generator (core test-jar), mapping each batch to the +/// field order the integration writers expect (`symbols`/`dates` first). The random-walk lives in +/// [OhlcData]; this only adapts the shape. +final class OhlcGenerator { - static final String[] TICKERS = { - "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "BRK.B", "JPM", "V", - "UNH", "XOM", "LLY", "JNJ", "MA", "PG", "HD", "MRK", "AVGO", "CVX", - "PEP", "ABBV", "KO", "COST", "WMT", "MCD", "CSCO", "ACN", "BAC", "CRM" - }; + static final String[] TICKERS = OhlcData.TICKERS; private OhlcGenerator() { } @@ -22,41 +19,9 @@ static List generate(int totalRows, int batchSize) { } static List generate(int totalRows, int batchSize, long seed) { - double[] prices = new double[TICKERS.length]; - Arrays.fill(prices, 100.0); - var rng = new Random(seed); - int startDay = (int) LocalDate.of(2020, 1, 2).toEpochDay(); - int rowsLeft = totalRows; - int rowsDone = 0; - var batches = new ArrayList(); - - while (rowsLeft > 0) { - int n = Math.min(rowsLeft, batchSize); - String[] symbols = new String[n]; - int[] dates = new int[n]; - double[] open = new double[n], high = new double[n], low = new double[n], close = new double[n]; - long[] volume = new long[n]; - for (int i = 0; i < n; i++) { - int ticker = i % TICKERS.length; - double px = prices[ticker]; - double ret = rng.nextGaussian() * 0.02; - double o = Math.round(px * (1 + ret * 0.3) * 100.0) * 0.01; - double c = Math.round(px * (1 + ret) * 100.0) * 0.01; - double spread = Math.abs(px * rng.nextDouble() * 0.03); - symbols[i] = TICKERS[ticker]; - dates[i] = startDay + (rowsDone + i) / TICKERS.length; - open[i] = o; - high[i] = Math.round((Math.max(o, c) + spread) * 100.0) * 0.01; - low[i] = Math.round((Math.min(o, c) - spread) * 100.0) * 0.01; - close[i] = c; - volume[i] = Math.max(100_000L, Math.round(1_000_000 + rng.nextGaussian() * 200_000)); - prices[ticker] = c; - } - batches.add(new OhlcBatch(symbols, dates, open, high, low, close, volume)); - rowsLeft -= n; - rowsDone += n; - } - return batches; + return OhlcData.generate(totalRows, batchSize, seed).stream() + .map(b -> new OhlcBatch(b.symbol(), b.date(), b.open(), b.high(), b.low(), b.close(), b.volume())) + .toList(); } record OhlcBatch(String[] symbols, int[] dates, double[] open, double[] high, diff --git a/pom.xml b/pom.xml index 7d421574..fe735a46 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ bom cli inspector + calcite integration performance