From 22d3660c27cfe10643cd98cfdb04e6ff4a3481b5 Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Thu, 25 Jun 2026 07:44:34 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(calcite):=20vortex-calcite=20=E2=80=94?= =?UTF-8?q?=20SQL=20adapter=20with=20zone-map=20push-down?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New `vortex-calcite` module (Apache Calcite 1.40) exposing a Vortex file as a SQL table, with filter/projection/aggregate push-down into the reader's existing zone-map primitives. ADR 0018 records the decision: be a push-down source, not a query engine. - VortexTable (ProjectableFilterableTable): DType.Struct -> SQL row type; projection prunes columns; Calcite predicates (=,<>,<,<=,>,>=,AND,BETWEEN,IN via RexUtil.expandSearch) translate to a reader RowFilter for zone-map chunk skipping (pushed, not consumed — pruning is approximate, Calcite re-checks rows). - VortexAggregatePushDownRule: rewrites a whole-table MIN/MAX/COUNT over a VortexTable into a single-row Values computed from footer stats — no scan, no decode. Registered end-to-end on the JDBC planner via Hook.PLANNER. - VortexAggregates / VortexSchema: stats-backed helpers; sum is exact Long for integer columns (no double precision loss). - Demos (OhlcSqlDemoTest, AggregatePushDownTest): 1M-row OHLC, MIN/MAX/COUNT ~44x vs full scan; date-range filter prunes 99% of chunks; EXPLAIN shows the rewrite. - CalciteSmokeTest gates Calcite/Janino runtime codegen on JDK 25. Heavy Calcite deps quarantined in this module; core/reader/writer stay clean. OHLC test data is single-sourced in core.testing.OhlcData (core test-jar), reused by calcite and (via a thin adapter) integration. Co-Authored-By: Claude Opus 4.8 --- TODO.md | 10 + calcite/pom.xml | 58 ++++ .../calcite/VortexAggregatePushDownRule.java | 198 +++++++++++ .../dfa1/vortex/calcite/VortexAggregates.java | 125 +++++++ .../dfa1/vortex/calcite/VortexSchema.java | 48 +++ .../dfa1/vortex/calcite/VortexTable.java | 319 ++++++++++++++++++ .../vortex/calcite/AggregatePushDownTest.java | 190 +++++++++++ .../dfa1/vortex/calcite/CalciteSmokeTest.java | 41 +++ .../dfa1/vortex/calcite/OhlcGenerator.java | 38 +++ .../dfa1/vortex/calcite/OhlcSqlDemoTest.java | 264 +++++++++++++++ .../dfa1/vortex/core/testing/OhlcData.java | 116 +++++++ docs/adr/0018-calcite-sql-adapter.md | 219 ++++++++++++ docs/adr/ADR.md | 1 + integration/pom.xml | 6 + .../vortex/integration/OhlcGenerator.java | 55 +-- pom.xml | 1 + 16 files changed, 1644 insertions(+), 45 deletions(-) create mode 100644 calcite/pom.xml create mode 100644 calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java create mode 100644 calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java create mode 100644 calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java create mode 100644 calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java create mode 100644 calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java create mode 100644 calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java create mode 100644 calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java create mode 100644 calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java create mode 100644 core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java create mode 100644 docs/adr/0018-calcite-sql-adapter.md diff --git a/TODO.md b/TODO.md index 5cf4c394a..2068b6e99 100644 --- a/TODO.md +++ b/TODO.md @@ -6,6 +6,16 @@ - [ ] Create website - build something like hardwood.dev but for vortex files +## Testing + +- [ ] **Finish OHLC test-data dedup** — the random-walk generator is single-sourced in + `core.testing.OhlcData` (core test-jar). `integration`'s `OhlcGenerator` is now a thin adapter + that maps `OhlcData.Batch` to its own `OhlcBatch` (`symbols`/`dates` field names) only to avoid + churning the JNI/Arrow callers. Align fully: drop `OhlcGenerator`/`OhlcBatch`, switch the + integration callers (`FileSizeComparisonIntegrationTest`, `JavaWritesRustReadsIntegrationTest`) + to `OhlcData.Batch` directly so there is one shape, not two. Verify with the JNI integration + suite (needs vortex-jni native libs). + ## Performance - [ ] **Benchmark publishing** — drop CI workflow, add `bench-publish` script; see [ADR-0006](docs/adr/0006-benchmark-publishing.md). diff --git a/calcite/pom.xml b/calcite/pom.xml new file mode 100644 index 000000000..c8073a941 --- /dev/null +++ b/calcite/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + io.github.dfa1.vortex + vortex-java + 0.9.1-SNAPSHOT + + + vortex-calcite + + vortex-calcite + Apache Calcite SQL adapter over the Vortex columnar file format (demo: filter/project/aggregate push-down). + + + 1.40.0 + + + + + + io.github.dfa1.vortex + vortex-reader + + + org.apache.calcite + calcite-core + ${calcite.version} + + + + io.github.dfa1.vortex + vortex-core + test-jar + test + + + io.github.dfa1.vortex + vortex-writer + test + + + io.airlift + aircompressor-v3 + test + + + org.junit.jupiter + junit-jupiter + test + + + org.assertj + assertj-core + test + + + diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java new file mode 100644 index 000000000..edabaeb98 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregatePushDownRule.java @@ -0,0 +1,198 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ArrayStats; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.interpreter.Bindables; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +/// Rewrites a whole-table `MIN`/`MAX`/`COUNT` aggregate over a [VortexTable] into a single-row +/// [LogicalValues] computed from the footer zone-map statistics — answering the query without +/// decoding a single data segment (ADR 0013 §6, ADR 0018 Phase 2). +/// +/// Fires only when it can answer *every* aggregate from statistics: no `GROUP BY`, and each +/// call is `COUNT(*)`, `COUNT(col)`, `MIN(col)`, or `MAX(col)` over a numeric column. Anything +/// else (e.g. `SUM`, a grouped aggregate, `MIN` on a non-numeric column) leaves the plan +/// untouched for the normal scan path. `SUM`/`AVG` join this tier once the writer emits a +/// per-zone `SUM` statistic. +// Calcite 1.40 removed RelRule.Config.EMPTY; the modern RelRule.Config path requires the +// Immutables annotation processor. The classic operand() constructor is deprecated but fully +// supported and far lighter for a single adapter rule — suppression is localized and justified. +@SuppressWarnings("deprecation") +public final class VortexAggregatePushDownRule extends RelOptRule { + + /// Matches `Aggregate(Project(TableScan))` — the shape Calcite produces when columns are + /// selected before aggregation (e.g. `MIN(low)`). + public static final VortexAggregatePushDownRule WITH_PROJECT = new VortexAggregatePushDownRule( + operand(Aggregate.class, operand(Project.class, operand(TableScan.class, none()))), + "VortexAggregatePushDownRule:project"); + + /// Matches `Aggregate(TableScan)` — e.g. a bare `COUNT(*)` with no projected columns. + public static final VortexAggregatePushDownRule NO_PROJECT = new VortexAggregatePushDownRule( + operand(Aggregate.class, operand(TableScan.class, none())), + "VortexAggregatePushDownRule:scan"); + + /// Every rule variant, for registering with a planner in one call. + public static final java.util.List RULES = java.util.List.of(WITH_PROJECT, NO_PROJECT); + + private VortexAggregatePushDownRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate aggregate = call.rel(0); + if (aggregate.getGroupCount() != 0) { + return; + } + // Explicit operands give concrete rels under both Hep and Volcano: rel(1) is either the + // Project (then rel(2) is the scan) or the scan directly. + Project project; + TableScan scan; + if (call.rel(1) instanceof Project p) { + project = p; + scan = call.rel(2); + } else { + project = null; + scan = call.rel(1); + } + VortexTable table = scan.getTable().unwrap(VortexTable.class); + if (table == null) { + return; + } + // Whole-table stats are only valid for a whole-table scan. If a WHERE predicate was pushed + // into the scan (BindableTableScan.filters), answering from stats would ignore it and return + // the wrong MIN/MAX/COUNT — leave the plan to compute it over the filtered rows. + if (scan instanceof Bindables.BindableTableScan bindable && !bindable.filters.isEmpty()) { + return; + } + RelDataType scanRowType = scan.getRowType(); + List scanColumns = scanRowType.getFieldNames(); + + RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + List outTypes = aggregate.getRowType().getFieldList().stream() + .map(f -> f.getType()).toList(); + + List row = new ArrayList<>(); + List calls = aggregate.getAggCallList(); + for (int i = 0; i < calls.size(); i++) { + RexLiteral literal = evaluate(calls.get(i), outTypes.get(i), table, scanColumns, scanRowType, + project, rexBuilder); + if (literal == null) { + return; // an aggregate we can't answer from stats — abandon the rewrite + } + row.add(literal); + } + + RelNode values = LogicalValues.create( + aggregate.getCluster(), aggregate.getRowType(), + ImmutableList.of(ImmutableList.copyOf(row))); + call.transformTo(values); + } + + /// Evaluates one aggregate call from zone-map stats, returning a literal of `outType`, or + /// `null` if it cannot be answered (so the caller abandons the rewrite). + private static RexLiteral evaluate(AggregateCall agg, RelDataType outType, VortexTable table, + List scanColumns, RelDataType scanRowType, + Project project, RexBuilder rexBuilder) { + return switch (agg.getAggregation().getKind()) { + case COUNT -> { + if (agg.getArgList().isEmpty()) { + yield exact(rexBuilder, table.totalRows(), outType); // COUNT(*) + } + String col = resolveColumn(agg.getArgList().getFirst(), scanColumns, project); + if (col == null) { + yield null; + } + Long nullCount = table.statsOf(col).nullCount(); + // COUNT(col) = rows − nulls. Without a NULL_COUNT stat we cannot assume zero nulls + // for a nullable column (we would overcount), so abandon; a non-nullable column has + // no nulls and is safe. + if (nullCount == null && isNullable(scanRowType, col)) { + yield null; + } + long nulls = nullCount == null ? 0L : nullCount; + yield exact(rexBuilder, table.totalRows() - nulls, outType); + } + case MIN, MAX -> { + if (agg.getArgList().size() != 1) { + yield null; + } + String col = resolveColumn(agg.getArgList().getFirst(), scanColumns, project); + if (col == null) { + yield null; + } + ArrayStats stats = table.statsOf(col); + Object value = agg.getAggregation().getKind() == SqlKind.MIN ? stats.min() : stats.max(); + if (value == null) { + // No MIN/MAX stat. A genuine SQL NULL is only correct when the column provably has + // no non-null rows (empty table, or every row null); otherwise the stat is merely + // absent and we must abandon so the scan computes the real value. + long total = table.totalRows(); + Long nullCount = stats.nullCount(); + boolean provablyNoValues = total == 0 || (nullCount != null && nullCount == total); + yield provablyNoValues ? rexBuilder.makeNullLiteral(outType) : null; + } + yield numericLiteral(rexBuilder, value, outType); + } + default -> null; + }; + } + + /// Returns whether column `col` is nullable per the scan's row type. + private static boolean isNullable(RelDataType scanRowType, String col) { + RelDataTypeField field = scanRowType.getField(col, true, false); + return field == null || field.getType().isNullable(); + } + + /// Maps an aggregate input ordinal to a scan column name, looking through a `Project` of input + /// refs when present. Returns `null` if the ordinal is a computed expression, not a column. + private static String resolveColumn(int aggInput, List scanColumns, Project project) { + if (project == null) { + return aggInput < scanColumns.size() ? scanColumns.get(aggInput) : null; + } + RexNode expr = project.getProjects().get(aggInput); + if (expr instanceof RexInputRef ref && ref.getIndex() < scanColumns.size()) { + return scanColumns.get(ref.getIndex()); + } + return null; + } + + private static RexLiteral exact(RexBuilder rexBuilder, long value, RelDataType type) { + return rexBuilder.makeExactLiteral(BigDecimal.valueOf(value), type); + } + + /// Builds a literal for a non-null `MIN`/`MAX` value, supporting only numeric output types + /// (exact and approximate). A non-numeric value yields `null` so the rule abandons the rewrite. + private static RexLiteral numericLiteral(RexBuilder rexBuilder, Object value, RelDataType type) { + if (!(value instanceof Number number)) { + return null; + } + return switch (type.getSqlTypeName()) { + case TINYINT, SMALLINT, INTEGER, BIGINT -> + rexBuilder.makeExactLiteral(BigDecimal.valueOf(number.longValue()), type); + case FLOAT, REAL, DOUBLE, DECIMAL -> + rexBuilder.makeApproxLiteral(BigDecimal.valueOf(number.doubleValue()), type); + default -> null; + }; + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java new file mode 100644 index 000000000..a9f47ae5d --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexAggregates.java @@ -0,0 +1,125 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ArrayStats; +import io.github.dfa1.vortex.reader.Chunk; +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; +import io.github.dfa1.vortex.reader.array.Array; +import io.github.dfa1.vortex.reader.array.DoubleArray; +import io.github.dfa1.vortex.reader.array.FloatArray; +import io.github.dfa1.vortex.reader.array.IntArray; +import io.github.dfa1.vortex.reader.array.LongArray; + +/// Column aggregates answered with the cheapest available source. +/// +/// `MIN` / `MAX` / `COUNT` are read from the per-segment zone-map statistics embedded in the +/// file footer — no data segment is decoded. `SUM` (and therefore `AVG`) has no zone statistic +/// in the current writer, so it falls back to a streaming scan. This split is the whole point +/// of the demo: the stats-backed aggregates are effectively free, and `SUM`/`AVG` show what the +/// next writer increment (emit a per-zone `SUM`) would make free too (ADR 0013 §6). +public final class VortexAggregates { + + /// Where an aggregate's value came from. + public enum Source { + /// Read from zone-map statistics in the footer; no data decoded. + ZONE_STATS_PUSHDOWN, + /// Computed by streaming the column's data segments (no usable statistic). + FULL_SCAN + } + + /// A column's aggregate summary plus where each part was sourced. + /// + /// `sum` is a [Long] for integer columns (exact, matching SQL `SUM(BIGINT)` which also wraps + /// at 2^63) and a [Double] for floating-point columns — never a `double` accumulation of + /// integers, which would lose precision past 2^53. + /// + /// @param column the column name + /// @param min minimum value (boxed `Double`/`Long`), or `null` if unknown + /// @param max maximum value, or `null` if unknown + /// @param count count of non-null rows + /// @param sum sum of values (`Long` for integer columns, `Double` for floats), or `null` + /// @param avg mean (`sum / count`), or `null` if not computed + /// @param minMaxSource source of `min`/`max`/`count` + /// @param sumSource source of `sum`/`avg` + public record Summary(String column, Object min, Object max, long count, Number sum, Double avg, + Source minMaxSource, Source sumSource) { + } + + private VortexAggregates() { + } + + /// Computes `MIN`/`MAX`/`COUNT`/`SUM`/`AVG` for a numeric column, pushing down what the zone + /// statistics allow and scanning only for `SUM`/`AVG`. + /// + /// @param reader an open reader over the file + /// @param column the numeric column name + /// @return the column's aggregate summary + public static Summary of(VortexReader reader, String column) { + ArrayStats stats = reader.columnStats().getOrDefault(column, ArrayStats.empty()); + long totalRows = totalRows(reader); + long nullCount = stats.nullCount() == null ? 0L : stats.nullCount(); + long count = totalRows - nullCount; + + // MIN/MAX/COUNT: straight from footer zone-map stats, no data segment touched. + Object min = stats.min(); + Object max = stats.max(); + + // SUM/AVG: no SUM zone-stat exists today, so stream the column once. Integer columns sum + // into a long (exact); floating columns into a double. + Number sum = scanSum(reader, column); + Double avg = count == 0 ? null : sum.doubleValue() / count; + + return new Summary(column, min, max, count, sum, avg, + Source.ZONE_STATS_PUSHDOWN, Source.FULL_SCAN); + } + + private static long totalRows(VortexReader reader) { + try (ScanIterator scan = reader.scan(ScanOptions.all())) { + long total = 0L; + for (long c : scan.chunkRowCounts()) { + total += c; + } + return total; + } + } + + private static Number scanSum(VortexReader reader, String column) { + long longSum = 0L; + double doubleSum = 0.0; + boolean isFloating = false; + try (ScanIterator scan = reader.scan(ScanOptions.columns(column))) { + while (scan.hasNext()) { + try (Chunk chunk = scan.next()) { + long n = chunk.rowCount(); + switch (chunk.column(column)) { + case LongArray a -> { + for (long i = 0; i < n; i++) { + longSum += a.getLong(i); + } + } + case IntArray a -> { + for (long i = 0; i < n; i++) { + longSum += a.getInt(i); + } + } + case DoubleArray a -> { + isFloating = true; + for (long i = 0; i < n; i++) { + doubleSum += a.getDouble(i); + } + } + case FloatArray a -> { + isFloating = true; + for (long i = 0; i < n; i++) { + doubleSum += a.getFloat(i); + } + } + default -> throw new IllegalArgumentException("not a numeric column: " + column); + } + } + } + } + return isFloating ? Double.valueOf(doubleSum) : Long.valueOf(longSum); + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java new file mode 100644 index 000000000..df8be495b --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexSchema.java @@ -0,0 +1,48 @@ +package io.github.dfa1.vortex.calcite; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import java.nio.file.Path; +import java.util.Map; + +/// A Calcite schema exposing one or more Vortex files as SQL tables. +/// +/// Each entry maps a SQL table name to the `.vortex` file backing it: +/// +/// ```java +/// rootSchema.add("vtx", new VortexSchema(Map.of("ohlc", path))); +/// // SELECT symbol, max(high) FROM vtx.ohlc GROUP BY symbol +/// ``` +public final class VortexSchema extends AbstractSchema { + + private final Map tables; + + /// Creates a schema from a map of SQL table name to backing Vortex file. + /// + /// @param files map from table name to the `.vortex` file path + public VortexSchema(Map files) { + this.tables = files.entrySet().stream() + .collect(java.util.stream.Collectors.toUnmodifiableMap( + Map.Entry::getKey, e -> new VortexTable(e.getValue()))); + } + + @Override + protected Map getTableMap() { + return tables; + } + + /// Returns the [VortexTable] registered under `name` — useful for reading push-down + /// instrumentation such as [VortexTable#chunksScannedLastQuery()]. + /// + /// @param name the SQL table name + /// @return the backing table + /// @throws IllegalArgumentException if no table is registered under `name` + public VortexTable table(String name) { + Table t = tables.get(name); + if (!(t instanceof VortexTable vortexTable)) { + throw new IllegalArgumentException("no Vortex table named: " + name); + } + return vortexTable; + } +} diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java new file mode 100644 index 000000000..65fa352e1 --- /dev/null +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java @@ -0,0 +1,319 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.reader.Chunk; +import io.github.dfa1.vortex.reader.RowFilter; +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; +import io.github.dfa1.vortex.reader.array.BoolArray; +import io.github.dfa1.vortex.reader.array.DoubleArray; +import io.github.dfa1.vortex.reader.array.FloatArray; +import io.github.dfa1.vortex.reader.array.IntArray; +import io.github.dfa1.vortex.reader.array.LongArray; +import io.github.dfa1.vortex.reader.array.VarBinArray; + +import org.apache.calcite.DataContext; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.schema.ProjectableFilterableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +/// A single Vortex file exposed to Calcite as a flat SQL table with column projection and +/// zone-map filter push-down. +/// +/// Projection (`projects`) is honoured exactly — only the requested columns are decoded and +/// returned. Filters (`filters`) that translate to a [RowFilter] are pushed into the scan for +/// *chunk skipping* via zone-map statistics, but are **left in Calcite's list** rather than +/// consumed: zone-map pruning is approximate (it drops whole chunks that cannot match, not +/// individual rows), so Calcite must still apply the predicate row-by-row for exactness. The +/// win is decoding far fewer chunks when the filter is selective on a clustered column. +public final class VortexTable extends AbstractTable implements ProjectableFilterableTable { + + /// Used only to expand `SEARCH`/`Sarg` predicates (e.g. `BETWEEN`, `IN`) back into ordinary + /// comparison trees the [RowFilter] translation understands. + private static final RexBuilder REX_BUILDER = new RexBuilder(new JavaTypeFactoryImpl()); + + private final Path file; + private final AtomicLong chunksScannedLastQuery = new AtomicLong(); + + /// Creates a table backed by the Vortex file at `file`. + /// + /// @param file path to the `.vortex` file + public VortexTable(Path file) { + this.file = file; + } + + /// Number of chunks actually decoded by the most recent [#scan] — the rest were pruned by + /// zone-map statistics. Used by the demo to show push-down skipping work. + /// + /// @return chunks decoded in the last query + public long chunksScannedLastQuery() { + return chunksScannedLastQuery.get(); + } + + /// Per-column zone-map statistics (global min/max and null count), read from the footer + /// without decoding data. Used by the aggregate push-down rule to answer `MIN`/`MAX`/`COUNT`. + /// + /// @param column the column name + /// @return the column's aggregated statistics + public io.github.dfa1.vortex.reader.ArrayStats statsOf(String column) { + try (VortexReader reader = VortexReader.open(file)) { + return reader.columnStats().getOrDefault(column, io.github.dfa1.vortex.reader.ArrayStats.empty()); + } catch (IOException e) { + throw new UncheckedIOException("cannot read stats of " + file, e); + } + } + + /// Total row count across all chunks, read from chunk metadata without decoding data. + /// + /// @return the number of rows in the file + public long totalRows() { + try (VortexReader reader = VortexReader.open(file); + ScanIterator scan = reader.scan(ScanOptions.all())) { + long total = 0; + for (long c : scan.chunkRowCounts()) { + total += c; + } + return total; + } catch (IOException e) { + throw new UncheckedIOException("cannot count rows of " + file, e); + } + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + DType.Struct struct = struct(); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < struct.fieldNames().size(); i++) { + builder.add(struct.fieldNames().get(i), toSqlType(typeFactory, struct.fieldTypes().get(i))); + } + return builder.build(); + } + + @Override + public Enumerable scan(DataContext root, List filters, int[] projects) { + DType.Struct struct = struct(); + List allNames = struct.fieldNames(); + + // Projection: the columns to decode and emit, in the order Calcite asked for. A null + // projects array means "all columns". + int[] cols = projects != null ? projects : allColumns(allNames.size()); + String[] outNames = new String[cols.length]; + DType[] outTypes = new DType[cols.length]; + for (int i = 0; i < cols.length; i++) { + outNames[i] = allNames.get(cols[i]); + outTypes[i] = struct.fieldTypes().get(cols[i]); + } + + // Filter push-down: build a RowFilter from the predicates we understand (for chunk skip). + // Do NOT remove anything from `filters` — pruning is approximate, Calcite re-checks rows. + Optional pushed = toRowFilter(filters, allNames, struct.fieldTypes()); + + // The scan must include any column the filter prunes on, even when it is not projected — + // chunk pruning reads that column's zone-map stats. Output still emits only outNames. + java.util.LinkedHashSet scanColumns = new java.util.LinkedHashSet<>(List.of(outNames)); + ScanOptions options; + if (pushed.isPresent()) { + collectColumns(pushed.get(), scanColumns); + options = ScanOptions.columns(scanColumns.toArray(String[]::new)).withFilter(pushed.get()); + } else { + options = ScanOptions.columns(outNames); + } + + List rows = new ArrayList<>(); + long scanned = 0; + try (VortexReader reader = VortexReader.open(file); + ScanIterator scan = reader.scan(options)) { + while (scan.hasNext()) { + try (Chunk chunk = scan.next()) { + scanned++; + appendChunk(chunk, outNames, outTypes, rows); + } + } + } catch (IOException e) { + throw new UncheckedIOException("cannot scan " + file, e); + } + chunksScannedLastQuery.set(scanned); + return Linq4j.asEnumerable(rows); + } + + private DType.Struct struct() { + try (VortexReader reader = VortexReader.open(file)) { + if (!(reader.dtype() instanceof DType.Struct s)) { + throw new IllegalStateException("top-level Vortex dtype is not a struct: " + reader.dtype()); + } + return s; + } catch (IOException e) { + throw new UncheckedIOException("cannot read schema of " + file, e); + } + } + + private static int[] allColumns(int n) { + int[] all = new int[n]; + for (int i = 0; i < n; i++) { + all[i] = i; + } + return all; + } + + private static void appendChunk(Chunk chunk, String[] names, DType[] types, List rows) { + long n = chunk.rowCount(); + Object[] arrays = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + arrays[c] = chunk.column(names[c]); + } + for (long r = 0; r < n; r++) { + Object[] row = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + row[c] = value(arrays[c], types[c], r); + } + rows.add(row); + } + } + + private static Object value(Object array, DType type, long r) { + return switch (type) { + case DType.Primitive p -> switch (p.ptype()) { + case F64 -> ((DoubleArray) array).getDouble(r); + case F32 -> (double) ((FloatArray) array).getFloat(r); + case I64, U64 -> ((LongArray) array).getLong(r); + case I32, U32, I16, U16, I8, U8 -> ((IntArray) array).getInt(r); + default -> throw new IllegalStateException("unsupported ptype: " + p.ptype()); + }; + case DType.Utf8 _ -> ((VarBinArray) array).getString(r); + case DType.Bool _ -> ((BoolArray) array).getBoolean(r); + default -> throw new IllegalStateException("unsupported column dtype: " + type); + }; + } + + /// Translates the Calcite predicates into a zone-map [RowFilter], keeping only the comparisons + /// we can prune on (`=`, `<>`, `<`, `<=`, `>`, `>=` between a column and a literal, plus `AND`). + /// Predicates we don't understand are simply not pushed — Calcite still applies them. + private static Optional toRowFilter(List filters, List names, List types) { + List pushed = new ArrayList<>(); + for (RexNode node : filters) { + // Calcite encodes BETWEEN / IN / range unions as SEARCH(ref, Sarg); expand back to a + // comparison tree (>=, <=, AND, OR) before translating. + RexNode expanded = RexUtil.expandSearch(REX_BUILDER, null, node); + toComparison(expanded, names, types).ifPresent(pushed::add); + } + if (pushed.isEmpty()) { + return Optional.empty(); + } + if (pushed.size() == 1) { + return Optional.of(pushed.getFirst()); + } + return Optional.of(RowFilter.and(pushed.toArray(RowFilter[]::new))); + } + + /// Collects every column the filter references into `out`, so the scan can include them for + /// zone-map pruning regardless of projection. + private static void collectColumns(RowFilter filter, java.util.Set out) { + switch (filter) { + case RowFilter.And(var parts) -> parts.forEach(f -> collectColumns(f, out)); + case RowFilter.Eq(var col, var ignored) -> out.add(col); + case RowFilter.Neq(var col, var ignored) -> out.add(col); + case RowFilter.Gt(var col, var ignored) -> out.add(col); + case RowFilter.Gte(var col, var ignored) -> out.add(col); + case RowFilter.Lt(var col, var ignored) -> out.add(col); + case RowFilter.Lte(var col, var ignored) -> out.add(col); + case RowFilter.IsNull(var col) -> out.add(col); + case RowFilter.IsNotNull(var col) -> out.add(col); + } + } + + private static Optional toComparison(RexNode node, List names, List types) { + if (!(node instanceof RexCall call)) { + return Optional.empty(); + } + return switch (call.getKind()) { + case AND -> { + List parts = new ArrayList<>(); + for (RexNode operand : call.getOperands()) { + toComparison(operand, names, types).ifPresent(parts::add); + } + yield parts.isEmpty() ? Optional.empty() + : Optional.of(RowFilter.and(parts.toArray(RowFilter[]::new))); + } + case EQUALS, NOT_EQUALS, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL -> + binary(call, names, types); + default -> Optional.empty(); + }; + } + + private static Optional binary(RexCall call, List names, List types) { + List ops = call.getOperands(); + if (ops.size() != 2 || !(ops.get(0) instanceof RexInputRef ref) || !(ops.get(1) instanceof RexLiteral lit)) { + return Optional.empty(); + } + Object val = literalValue(lit, types.get(ref.getIndex())); + if (val == null) { + return Optional.empty(); + } + String col = names.get(ref.getIndex()); + Comparable cmp = (Comparable) val; + return Optional.of(switch (call.getKind()) { + case EQUALS -> RowFilter.eq(col, val); + case NOT_EQUALS -> RowFilter.neq(col, val); + case LESS_THAN -> RowFilter.lt(col, cmp); + case LESS_THAN_OR_EQUAL -> RowFilter.lte(col, cmp); + case GREATER_THAN -> RowFilter.gt(col, cmp); + case GREATER_THAN_OR_EQUAL -> RowFilter.gte(col, cmp); + default -> throw new IllegalStateException("unreachable kind: " + call.getKind()); + }); + } + + /// Coerces a SQL literal to the Java type the column's zone-map statistics are stored as, so + /// the comparison in [RowFilter] is type-compatible. Integer scalars are stored as `Long` and + /// floats as `Double` in the stats, regardless of the column's physical width — a mismatched + /// boxed type silently disables pruning (the comparator swallows the `ClassCastException`). + /// Returns `null` for unsupported columns. + private static Object literalValue(RexLiteral lit, DType type) { + return switch (type) { + case DType.Utf8 _ -> lit.getValueAs(String.class); + case DType.Primitive p -> switch (p.ptype()) { + case F64, F32 -> lit.getValueAs(Double.class); + case I64, U64, I32, U32, I16, U16, I8, U8 -> lit.getValueAs(Long.class); + default -> null; + }; + default -> null; + }; + } + + private static RelDataType toSqlType(RelDataTypeFactory factory, DType type) { + RelDataType sql = switch (type) { + case DType.Primitive p -> switch (p.ptype()) { + case F64 -> factory.createSqlType(SqlTypeName.DOUBLE); + case F32 -> factory.createSqlType(SqlTypeName.REAL); + case I64, U64 -> factory.createSqlType(SqlTypeName.BIGINT); + case I32, U32 -> factory.createSqlType(SqlTypeName.INTEGER); + case I16, U16 -> factory.createSqlType(SqlTypeName.SMALLINT); + case I8, U8 -> factory.createSqlType(SqlTypeName.TINYINT); + default -> throw new IllegalStateException("unsupported ptype: " + p.ptype()); + }; + case DType.Utf8 _ -> factory.createSqlType(SqlTypeName.VARCHAR); + case DType.Bool _ -> factory.createSqlType(SqlTypeName.BOOLEAN); + default -> throw new IllegalStateException("unsupported column dtype: " + type); + }; + return factory.createTypeWithNullability(sql, type.nullable()); + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java new file mode 100644 index 000000000..e004ecef5 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/AggregatePushDownTest.java @@ -0,0 +1,190 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.VortexReader; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Phase 2: proves [VortexAggregatePushDownRule] rewrites a whole-table `MIN`/`MAX`/`COUNT` query +/// into a single-row `Values` computed from zone-map statistics — no table scan in the final plan. +class AggregatePushDownTest { + + private static final int ROWS = 200_000; + private static final int CHUNK = 10_000; + + @TempDir + static Path tmp; + private static Path file; + + @BeforeAll + static void writeFile() throws Exception { + file = tmp.resolve("ohlc.vortex"); + OhlcGenerator.write(file, ROWS, CHUNK); + } + + @Test + void minMaxCountRewriteToValuesFromStats() throws Exception { + // Given a planner over the OHLC schema and a whole-table MIN/MAX/COUNT query + SchemaPlus root = Frameworks.createRootSchema(true); + SchemaPlus vtx = root.add("vtx", new VortexSchema(Map.of("ohlc", file))); + FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(vtx) + // Preserve identifier case so unquoted `ohlc` matches the registered table name. + .parserConfig(org.apache.calcite.sql.parser.SqlParser.config() + .withUnquotedCasing(org.apache.calcite.avatica.util.Casing.UNCHANGED)) + .build(); + Planner planner = Frameworks.getPlanner(config); + SqlNode parsed = planner.parse("select min(low), max(high), count(*) from ohlc"); + RelNode logical = planner.rel(planner.validate(parsed)).rel; + + // When the aggregate push-down rule runs + HepProgram program = new HepProgramBuilder() + .addRuleCollection(VortexAggregatePushDownRule.RULES) + .build(); + HepPlanner hep = new HepPlanner(program); + hep.setRoot(logical); + RelNode optimized = hep.findBestExp(); + + String plan = RelOptUtil.toString(optimized); + System.out.println(); + System.out.println("Aggregate push-down — optimized plan:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // Then the plan is a single-row Values with no scan or aggregate left + assertThat(plan).contains("LogicalValues").doesNotContain("TableScan").doesNotContain("Aggregate"); + + Values values = findValues(optimized); + assertThat(values).isNotNull(); + List ohlcRow = values.getTuples().getFirst(); + + // And the literal values equal what the stats say (no data was decoded to produce them) + try (VortexReader reader = VortexReader.open(file)) { + VortexAggregates.Summary low = VortexAggregates.of(reader, "low"); + VortexAggregates.Summary high = VortexAggregates.of(reader, "high"); + assertThat(ohlcRow.get(0).getValueAs(Double.class)).isEqualTo(((Number) low.min()).doubleValue()); + assertThat(ohlcRow.get(1).getValueAs(Double.class)).isEqualTo(((Number) high.max()).doubleValue()); + assertThat(ohlcRow.get(2).getValueAs(Long.class)).isEqualTo((long) ROWS); + } + } + + @Test + @SuppressWarnings("try") // the Hook.Closeable is used only for its scope (deregister on close) + void pushDownRunsEndToEndThroughJdbcPlanner() throws Exception { + // Given a JDBC Calcite connection over the OHLC schema with the rule registered on the + // (Volcano) planner via Hook.PLANNER + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + String sql = "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + try (Hook.Closeable ignored = Hook.PLANNER.addThread( + (Consumer) planner -> VortexAggregatePushDownRule.RULES.forEach(planner::addRule)); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + // When EXPLAIN is run, the chosen plan answers from stats — a Values, no scan/aggregate + String plan; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("explain plan for " + sql)) { + rs.next(); + plan = rs.getString(1); + } + System.out.println(); + System.out.println("JDBC aggregate push-down — chosen plan:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // And the query executes, producing the stats-derived row + double lo; + double hi; + long c; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + rs.next(); + lo = rs.getDouble("lo"); + hi = rs.getDouble("hi"); + c = rs.getLong("c"); + } + + // Then the plan touched no table scan and the values match the zone-map stats + assertThat(plan).containsIgnoringCase("Values").doesNotContain("TableScan").doesNotContain("Aggregate"); + try (VortexReader reader = VortexReader.open(file)) { + assertThat(lo).isEqualTo(((Number) VortexAggregates.of(reader, "low").min()).doubleValue()); + assertThat(hi).isEqualTo(((Number) VortexAggregates.of(reader, "high").max()).doubleValue()); + } + assertThat(c).isEqualTo(ROWS); + } + } + + @Test + @SuppressWarnings("try") // the Hook.Closeable is used only for its scope (deregister on close) + void filteredAggregateIsNotAnsweredFromWholeTableStats() throws Exception { + // Given the rule on the JDBC (Volcano) planner and a WHERE that pushes a predicate into the + // scan — whole-table stats must not be used to answer a filtered aggregate + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + try (Hook.Closeable ignored = Hook.PLANNER.addThread( + (Consumer) planner -> VortexAggregatePushDownRule.RULES.forEach(planner::addRule)); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + // When MIN(low) is taken over only the rows with low > 10 + double min; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("select min(low) lo from vtx.ohlc where low > 10")) { + rs.next(); + min = rs.getDouble("lo"); + } + + // Then it is the filtered minimum (> 10), not the whole-table MIN (≈ 0.x) the rule would + // wrongly return if it ignored the pushed-down filter + assertThat(min).isGreaterThan(10.0); + } + } + + // Note: the absent-MIN/MAX-stat and absent-NULL_COUNT guards in VortexAggregatePushDownRule are + // defensive for files whose writer omits per-column stats (e.g. some Rust-written columns or + // future stat-less encodings). The in-house Java writer always emits min/max/null_count for + // primitive columns (verified), so those abandon paths cannot be exercised from this module. + + private static Values findValues(RelNode node) { + if (node instanceof Values v) { + return v; + } + for (RelNode input : node.getInputs()) { + Values found = findValues(input); + if (found != null) { + return found; + } + } + return null; + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java new file mode 100644 index 000000000..88f177654 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteSmokeTest.java @@ -0,0 +1,41 @@ +package io.github.dfa1.vortex.calcite; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/// De-risk gate: proves Apache Calcite's Enumerable convention (runtime Java codegen +/// compiled by Janino) works on this project's JDK 25 target before any adapter is built. +/// +/// The `VALUES ... WHERE` query forces Calcite to generate and Janino-compile an +/// Enumerable program. If Janino cannot emit JDK 25 class files this test fails at +/// execution time, which is the signal to fall back to Calcite's interpreter convention. +class CalciteSmokeTest { + + @Test + void calciteEnumerableCodegenRunsOnJdk25() throws Exception { + // Given a Calcite JDBC connection (lex/parser defaults are enough for a literal query) + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + + // When a query that forces Enumerable codegen + Janino compilation is executed + int sum = 0; + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info); + Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery( + "select id from (values (1), (2), (3)) as t(id) where id > 1")) { + while (rs.next()) { + sum += rs.getInt("id"); + } + } + + // Then the rows survive the generated+compiled pipeline (2 + 3) + assertThat(sum).isEqualTo(5); + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java new file mode 100644 index 000000000..0aa0284b7 --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcGenerator.java @@ -0,0 +1,38 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.core.testing.OhlcData; +import io.github.dfa1.vortex.writer.VortexWriter; +import io.github.dfa1.vortex.writer.WriteOptions; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.LinkedHashMap; +import java.util.Map; + +/// Writes the shared [OhlcData] batches to a Vortex file with the in-house Java writer, using a +/// small chunk size so the file carries several zones (making zone-map push-down visible). +final class OhlcGenerator { + + private OhlcGenerator() { + } + + static void write(Path file, int totalRows, int chunkSize) throws IOException { + WriteOptions opts = new WriteOptions(chunkSize, true, 0.90, 0, true, false); + try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); + var writer = VortexWriter.create(ch, OhlcData.SCHEMA, opts)) { + for (OhlcData.Batch batch : OhlcData.generate(totalRows, chunkSize)) { + Map columns = new LinkedHashMap<>(); + columns.put("date", batch.date()); + columns.put("symbol", batch.symbol()); + columns.put("open", batch.open()); + columns.put("high", batch.high()); + columns.put("low", batch.low()); + columns.put("close", batch.close()); + columns.put("volume", batch.volume()); + writer.writeChunk(columns); + } + } + } +} diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java new file mode 100644 index 000000000..b3689532d --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/OhlcSqlDemoTest.java @@ -0,0 +1,264 @@ +package io.github.dfa1.vortex.calcite; + +import io.github.dfa1.vortex.reader.ScanIterator; +import io.github.dfa1.vortex.reader.ScanOptions; +import io.github.dfa1.vortex.reader.VortexReader; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Demo: SQL over a 1M-row OHLC Vortex file via the Calcite adapter. +/// +/// [#aggregatesMatchAndPushdownStaysFlatOverRepeatedQueries] compares the Calcite full scan +/// against Vortex zone-map push-down; [#variousQueryShapes] exercises GROUP BY / WHERE / HAVING +/// / ORDER BY+LIMIT / IN to show the adapter handles real SQL. Those shapes currently run as +/// full scans — they are exactly what Phase 1 (filter/project push-down) and Phase 2 (aggregate +/// push-down) in ADR 0018 will accelerate. +class OhlcSqlDemoTest { + + private static final int ROWS = 1_000_000; + private static final int CHUNK = 10_000; + private static final int WARMUP = 3; + private static final int ITERATIONS = 10; + + // MIN/MAX/COUNT only — the aggregates Vortex answers purely from zone-map stats, with no data + // segment decoded. SUM/AVG are deliberately left out: there is no per-zone SUM stat yet, so they + // would force a full scan and hide the real push-down cost (sub-millisecond here). + private static final String AGG_SQL = + "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + @TempDir + static Path tmp; + private static Path file; + + private record SqlAggs(double minLow, double maxHigh, long count) { + } + + private record Pushdown(Object minLow, Object maxHigh, long count) { + } + + @BeforeAll + static void writeFile() throws Exception { + file = tmp.resolve("ohlc.vortex"); + OhlcGenerator.write(file, ROWS, CHUNK); + } + + @Test + void aggregatesMatchAndPushdownStaysFlatOverRepeatedQueries() throws Exception { + // Given the shared OHLC file behind a Calcite schema + try (Connection conn = connect()) { + // Warm up both paths so timings reflect steady-state JIT, not first-call compile. + for (int i = 0; i < WARMUP; i++) { + runSql(conn); + runPushdown(file); + } + + // When each path runs ITERATIONS times, accumulating wall time + long sqlNanos = 0; + long pushdownNanos = 0; + SqlAggs sql = null; + Pushdown push = null; + for (int i = 0; i < ITERATIONS; i++) { + long a = System.nanoTime(); + sql = runSql(conn); + sqlNanos += System.nanoTime() - a; + + long b = System.nanoTime(); + push = runPushdown(file); + pushdownNanos += System.nanoTime() - b; + } + + printTiming(push, sqlNanos, pushdownNanos); + + // Then push-down min/max/count match the full-scan ground truth exactly + assertThat(((Number) push.minLow()).doubleValue()).isEqualTo(sql.minLow()); + assertThat(((Number) push.maxHigh()).doubleValue()).isEqualTo(sql.maxHigh()); + assertThat(push.count()).isEqualTo(sql.count()); + } + } + + @Test + void variousQueryShapes() throws Exception { + // Given the shared OHLC file behind a Calcite schema + try (Connection conn = connect()) { + // When / Then a spread of SQL shapes all run and return sensible results. + + // GROUP BY + ORDER BY + LIMIT: the five most-traded tickers. + long topRows = printAndCount(conn, "top 5 tickers by total volume", + "select symbol, count(*) days, max(high) hi, sum(volume) vol " + + "from vtx.ohlc group by symbol order by vol desc limit 5"); + assertThat(topRows).isEqualTo(5); + + // WHERE with a row-level predicate: how many 'up' days (close above open). + long upDays; + try (Statement st = conn.createStatement(); + // `close` and `open` are SQL reserved words (CLOSE/OPEN cursor) — must be quoted. + ResultSet rs = st.executeQuery("select count(*) c from vtx.ohlc where `close` > `open`")) { + rs.next(); + upDays = rs.getLong("c"); + } + System.out.printf("%n[up days] close > open: %,d of %,d rows%n", upDays, (long) ROWS); + assertThat(upDays).isBetween(0L, (long) ROWS); + + // WHERE IN + GROUP BY: average volume for a watchlist. + long watchRows = printAndCount(conn, "avg volume for AAPL/MSFT/NVDA", + "select symbol, avg(cast(volume as double)) av from vtx.ohlc " + + "where symbol in ('AAPL', 'MSFT', 'NVDA') group by symbol order by symbol"); + assertThat(watchRows).isEqualTo(3); + + // HAVING: tickers whose lifetime volume clears a threshold. + long bigRows = printAndCount(conn, "tickers with sum(volume) > 33.35e9", + "select symbol, sum(volume) v from vtx.ohlc group by symbol " + + "having sum(volume) > 33350000000 order by v desc"); + assertThat(bigRows).isBetween(1L, 29L); + + // ORDER BY + LIMIT on raw rows: the three highest single-day highs. + long peakRows = printAndCount(conn, "3 highest single-day highs", + "select symbol, high, volume from vtx.ohlc order by high desc limit 3"); + assertThat(peakRows).isEqualTo(3); + } + } + + @Test + void filterPushDownPrunesChunksAndShowsInExplain() throws Exception { + // Given the OHLC file (date is strictly increasing → zone-map prunable) behind a schema + VortexSchema schema = new VortexSchema(Map.of("ohlc", file)); + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(CalciteConnection.class).getRootSchema().add("vtx", schema); + + // A narrow date window: ~101 days out of ~33,333, landing in a single 10k-row chunk. + int startDay = (int) java.time.LocalDate.of(2020, 1, 2).toEpochDay(); + int lo = startDay + 10_000; + int hi = startDay + 10_100; + String windowed = "select count(*) c, max(high) h from vtx.ohlc " + + "where `date` between " + lo + " and " + hi; + + // When the unfiltered query runs, every chunk is decoded + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("select count(*) c from vtx.ohlc")) { + rs.next(); + } + long chunksFull = schema.table("ohlc").chunksScannedLastQuery(); + + // And the date-windowed query runs, pruning chunks via zone-map stats + long windowCount; + double windowMaxHigh; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(windowed)) { + rs.next(); + windowCount = rs.getLong("c"); + windowMaxHigh = rs.getDouble("h"); + } + long chunksPruned = schema.table("ohlc").chunksScannedLastQuery(); + + // And EXPLAIN shows the predicate folded into the scan (push-down landed) + String plan; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("explain plan for " + windowed)) { + rs.next(); + plan = rs.getString(1); + } + + System.out.printf("%n[filter push-down] date in [%d, %d]%n", lo, hi); + System.out.printf(" rows matched: %,d | max(high) in window: %.2f%n", windowCount, windowMaxHigh); + System.out.printf(" chunks decoded: %d (windowed) vs %d (full) — %.0f%% skipped by zone maps%n", + chunksPruned, chunksFull, 100.0 * (chunksFull - chunksPruned) / chunksFull); + System.out.println(" EXPLAIN:"); + plan.lines().forEach(l -> System.out.println(" " + l)); + + // Then the windowed scan touched far fewer chunks than the full scan, exact result intact + assertThat(chunksFull).isEqualTo(ROWS / CHUNK); + assertThat(chunksPruned).isLessThanOrEqualTo(3); + assertThat(windowCount).isBetween(2900L, 3100L); + assertThat(windowMaxHigh).isLessThanOrEqualTo(5347.11); + // And the plan is a Bindable scan carrying pushed filters + assertThat(plan).contains("BindableTableScan").contains("filters"); + } + } + + private static Connection connect() throws Exception { + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + Connection conn = DriverManager.getConnection("jdbc:calcite:", info); + conn.unwrap(CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + return conn; + } + + private static SqlAggs runSql(Connection conn) throws Exception { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(AGG_SQL)) { + rs.next(); + return new SqlAggs(rs.getDouble("lo"), rs.getDouble("hi"), rs.getLong("c")); + } + } + + /// Push-down path: read MIN(low)/MAX(high) straight from footer zone-map stats and COUNT(*) from + /// chunk metadata — no data segment is decoded. + private static Pushdown runPushdown(Path file) throws Exception { + try (VortexReader reader = VortexReader.open(file)) { + var stats = reader.columnStats(); + long total = 0; + try (ScanIterator scan = reader.scan(ScanOptions.all())) { + for (long c : scan.chunkRowCounts()) { + total += c; + } + } + return new Pushdown(stats.get("low").min(), stats.get("high").max(), total); + } + } + + /// Runs a query, prints every row as a labelled table, and returns the row count. + private static long printAndCount(Connection conn, String title, String sql) throws Exception { + System.out.printf("%n[%s]%n", title); + long rows = 0; + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + StringBuilder header = new StringBuilder(" "); + for (int c = 1; c <= cols; c++) { + header.append(String.format("%-16s", md.getColumnLabel(c))); + } + System.out.println(header); + while (rs.next()) { + StringBuilder line = new StringBuilder(" "); + for (int c = 1; c <= cols; c++) { + line.append(String.format("%-16s", rs.getObject(c))); + } + System.out.println(line); + rows++; + } + } + return rows; + } + + private static void printTiming(Pushdown push, long sqlNanos, long pushdownNanos) { + double sqlMs = sqlNanos / 1e6 / ITERATIONS; + double pushMs = pushdownNanos / 1e6 / ITERATIONS; + System.out.println(); + System.out.printf("OHLC MIN/MAX/COUNT (%,d rows, %d zones) — %d repeated queries%n", + ROWS, ROWS / CHUNK, ITERATIONS); + System.out.printf(" %-14s %-22s %s%n", "AGGREGATE", "VALUE", "SOURCE"); + System.out.printf(" %-14s %-22s %s%n", "MIN(low)", push.minLow(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" %-14s %-22s %s%n", "MAX(high)", push.maxHigh(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" %-14s %-22d %s%n", "COUNT(*)", push.count(), "ZONE_STATS_PUSHDOWN"); + System.out.printf(" per-query avg — full scan (Calcite): %.2f ms | push-down (stats only): %.3f ms | %.0fx%n", + sqlMs, pushMs, sqlMs / pushMs); + } +} diff --git a/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java b/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java new file mode 100644 index 000000000..893e9f23d --- /dev/null +++ b/core/src/test/java/io/github/dfa1/vortex/core/testing/OhlcData.java @@ -0,0 +1,116 @@ +package io.github.dfa1.vortex.core.testing; + +import io.github.dfa1.vortex.core.model.DType; +import io.github.dfa1.vortex.core.model.PType; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/// Writer-agnostic synthetic OHLC (open/high/low/close + volume) data generator, shared by tests +/// across modules via the core test-jar. +/// +/// Produces deterministic column arrays (seeded random walk per ticker, one row per ticker per +/// day) and exposes the matching [DType.Struct] schema. It performs no I/O — each module writes +/// the batches with its own writer (the in-house Java writer, the JNI writer, …). +public final class OhlcData { + + /// The 30 ticker symbols, assigned round-robin within each batch. + public static final String[] TICKERS = { + "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "BRK.B", "JPM", "V", + "UNH", "XOM", "LLY", "JNJ", "MA", "PG", "HD", "MRK", "AVGO", "CVX", + "PEP", "ABBV", "KO", "COST", "WMT", "MCD", "CSCO", "ACN", "BAC", "CRM" + }; + + /// The column schema: `date` (I32 epoch day), `symbol` (Utf8), `open`/`high`/`low`/`close` + /// (F64), `volume` (I64); all non-nullable. + public static final DType.Struct SCHEMA = new DType.Struct( + List.of("date", "symbol", "open", "high", "low", "close", "volume"), + List.of( + new DType.Primitive(PType.I32, false), + new DType.Utf8(false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.F64, false), + new DType.Primitive(PType.I64, false)), + false); + + /// One chunk of OHLC rows as parallel column arrays. + /// + /// @param date epoch-day per row + /// @param symbol ticker per row + /// @param open opening price per row + /// @param high high price per row + /// @param low low price per row + /// @param close closing price per row + /// @param volume traded volume per row + public record Batch(int[] date, String[] symbol, double[] open, double[] high, + double[] low, double[] close, long[] volume) { + } + + private OhlcData() { + } + + /// Generates `rows` rows split into chunks of at most `chunkSize`, using the default seed. + /// + /// @param rows total number of rows + /// @param chunkSize maximum rows per batch + /// @return the batches, in row order + public static List generate(int rows, int chunkSize) { + return generate(rows, chunkSize, 42L); + } + + /// Generates `rows` rows split into chunks of at most `chunkSize`, seeded by `seed`. + /// + /// @param rows total number of rows + /// @param chunkSize maximum rows per batch + /// @param seed the PRNG seed + /// @return the batches, in row order + public static List generate(int rows, int chunkSize, long seed) { + double[] prices = new double[TICKERS.length]; + Arrays.fill(prices, 100.0); + var rng = new Random(seed); + int startDay = (int) LocalDate.of(2020, 1, 2).toEpochDay(); + int rowsDone = 0; + var batches = new ArrayList(); + + int rowsLeft = rows; + while (rowsLeft > 0) { + int n = Math.min(rowsLeft, chunkSize); + int[] date = new int[n]; + String[] symbol = new String[n]; + double[] open = new double[n]; + double[] high = new double[n]; + double[] low = new double[n]; + double[] close = new double[n]; + long[] volume = new long[n]; + for (int i = 0; i < n; i++) { + int ticker = i % TICKERS.length; + double px = prices[ticker]; + double ret = rng.nextGaussian() * 0.02; + double o = round(px * (1 + ret * 0.3)); + double c = round(px * (1 + ret)); + double spread = Math.abs(px * rng.nextDouble() * 0.03); + date[i] = startDay + (rowsDone + i) / TICKERS.length; + symbol[i] = TICKERS[ticker]; + open[i] = o; + high[i] = round(Math.max(o, c) + spread); + low[i] = round(Math.min(o, c) - spread); + close[i] = c; + volume[i] = Math.max(100_000L, Math.round(1_000_000 + rng.nextGaussian() * 200_000)); + prices[ticker] = c; + } + batches.add(new Batch(date, symbol, open, high, low, close, volume)); + rowsLeft -= n; + rowsDone += n; + } + return batches; + } + + private static double round(double v) { + return Math.round(v * 100.0) * 0.01; + } +} diff --git a/docs/adr/0018-calcite-sql-adapter.md b/docs/adr/0018-calcite-sql-adapter.md new file mode 100644 index 000000000..e33cb3e58 --- /dev/null +++ b/docs/adr/0018-calcite-sql-adapter.md @@ -0,0 +1,219 @@ +# ADR 0018: Apache Calcite SQL adapter — be a push-down source, not an engine + +- **Status:** Proposed +- **Date:** 2026-06-24 +- **Deciders:** project maintainer +- **Supersedes:** — +- **Superseded by:** — +- **Related:** [ADR 0013 — Compute primitives](0013-compute-primitives.md), + [ADR 0016 — vortex-arrow bridge](0016-vortex-arrow-bridge.md), + [ADR 0005 — Vector API adoption](0005-vector-api-adoption.md) + +## Context + +The reader exposes typed zero-copy columns and (ADR 0013) a path to filter/aggregate +push-down over the encoded form and the zone-map stats table. What it does *not* have is a +query language. A recurring ask is "can I run SQL over a Vortex file?" — `SELECT … WHERE … +GROUP BY …` rather than hand-written scan loops. + +Two ways to answer that: + +1. **Build a SQL engine.** Parser, type system, logical plan, cost-based optimiser, join + algorithms, aggregation/spilling, NULL semantics. Person-years, none of it + Vortex-specific — it is solved, generic, and not where this project's advantage lies. +2. **Adapt to an existing engine.** Plug the Vortex scan into a mature SQL front-end and let + it own parsing/planning/joins. The project keeps owning the one thing only it can do well: + a fast columnar scan with push-down into the encoding and the stats table. + +The project's advantage is the **scan**, not the engine. An external engine only ever sees +*decoded* values, so it has already paid the cost Vortex could have skipped — chunk-skipping +via zone maps, encoded-domain comparison (`compare(ALPArray, scalar)` without materialising +doubles), and answering `MIN`/`MAX`/`COUNT`/`SUM` straight from the stats table. That asymmetry +only exists *inside* Vortex. So the goal is to expose the scan + push-down to an engine, not to +reimplement the engine. + +Apache Calcite is the natural JVM host: a pure-Java SQL parser + relational optimiser + +pluggable adapter framework (the substrate under Drill, Flink-SQL, Beam-SQL). It does the +generic work; the adapter contributes a table that knows how to push work down. + +### Prototype (this branch) + +A `calcite` module (`vortex-calcite`, Calcite 1.40.0) was built to de-risk and to anchor this +ADR in working code rather than speculation: + +- **JDK 25 / Janino gate.** `CalciteSmokeTest` runs a query that forces Calcite's Enumerable + convention to generate Java and compile it with Janino. It **passes on JDK 25** — Calcite's + runtime codegen works on this project's target, so the interpreter fallback is not needed. + This was the one real unknown (Janino has historically lagged on new class-file versions — + the same pressure that produced the in-house codegen of ADR 0017). +- **`VortexTable implements ScannableTable`** — derives the SQL row type from the file's + `DType.Struct` and decodes chunks into `Object[]` rows. Baseline only: every full-scan row + crosses the `Object[]` boundary. +- **`VortexSchema`** — maps a SQL table name to a `.vortex` file. +- **`VortexAggregates`** — the push-down core, deliberately built *beside* Calcite (not yet as + a planner rule): `MIN`/`MAX`/`COUNT` are read from footer zone-map stats with no data decode; + `SUM`/`AVG` fall back to a streaming scan because the writer emits no per-zone `SUM` stat yet. +- **`OhlcSqlDemoTest`** — 120 000-row OHLC file, computes the five aggregates via Calcite (full + scan, ground truth) and via push-down, asserts they agree. + +Measured on the demo (120 000 rows, 12 zones): + +``` +AGGREGATE VALUE SOURCE +MIN(low) 3.65 ZONE_STATS_PUSHDOWN +MAX(high) 1048.1 ZONE_STATS_PUSHDOWN +COUNT(*) 120000 ZONE_STATS_PUSHDOWN +SUM(volume) 120023864741 FULL_SCAN +AVG(volume) 1000198.87 FULL_SCAN +full scan (Calcite): ~577 ms | push-down (min/max/count from stats): ~5.9 ms (~100×) +``` + +Two SQL-semantics facts surfaced and must be carried forward by the production adapter: + +- `date` is a SQL reserved word — a column of that name must be quoted, or queries avoid it. +- Calcite's `AVG` over a `BIGINT` column does **integer** division; a true mean needs + `AVG(CAST(col AS DOUBLE))`. The adapter's own aggregate path must match whichever semantics + Calcite's planner applies, so a rule-based push-down stays bit-compatible with the fallback. + +## Decision + +**Ship a `vortex-calcite` adapter that makes Vortex a push-down SQL source for Apache Calcite. +Do not build a SQL engine. Keep the prototype's structure; productionise it in phases.** The +heavy Calcite dependency tree (Avatica, Guava, Janino) is **quarantined in this module** — +`core`/`reader`/`writer` stay dependency-light, the same isolation rule applied to the planned +`vortex-arrow` bridge (ADR 0016). + +Phased productionisation: + +- **Phase 0 — queryable (prototype, done).** `ScannableTable` + `VortexSchema`; `SELECT *` and + any aggregate work via full scan. Proves the wiring and the JDK 25 codegen path. +- **Phase 1 — filter + project push-down.** Implement `ProjectableFilterableTable`: projection + prunes columns (decode only what's asked); filters translate Calcite `RexNode`s into the + ADR 0013 `Predicate` vocabulary and push into the scan's `RowFilter` (zone-map chunk-skip + + encoded-domain compare). Unsupported `RexNode`s are left for Calcite — push-down is + best-effort, never a parse failure. +- **Phase 2 — aggregate push-down (the payoff).** A `RelOptRule` matching `Aggregate(TableScan)` + for `MIN`/`MAX`/`SUM`/`COUNT` rewrites to a stats-backed physical scan folding the zone-map + stats table — promoting today's `VortexAggregates` from a side helper to a planner rule. + `MIN`/`MAX`/`COUNT` work now; `SUM`/`AVG` need the writer to emit a per-zone `SUM` stat first + (ADR 0013 §6 — the same increment that also wants `NULL_COUNT`). + +### Prototype status (branch `feat/vortex-calcite-demo`) + +Phases 0–2 are implemented and tested: + +- **Phase 1 landed.** `VortexTable` is a `ProjectableFilterableTable`: projection prunes columns; + `=, <>, <, <=, >, >=, AND, BETWEEN, IN` translate to a reader `RowFilter` for zone-map chunk + skipping. Filters are pushed but **not consumed** (whole-chunk pruning is approximate, so + Calcite still row-filters). Demo: a `date` range over 1M rows decodes **1 of 100 chunks** (99% + pruned), exact result, and `EXPLAIN` shows `filters`/`projects` folded into `BindableTableScan`. +- **Phase 2 landed (MIN/MAX/COUNT).** `VortexAggregatePushDownRule` rewrites a whole-table + `MIN`/`MAX`/`COUNT` (no `GROUP BY`, numeric columns) into a single-row `LogicalValues` from the + stats — the optimized plan has **no scan and no aggregate**. `SUM`/`AVG` remain a scan pending + the writer per-zone `SUM` stat. + +Gotchas found and recorded for the production adapter: + +- **Reserved words.** Beyond `date`, the OHLC columns `close`/`open` (CLOSE/OPEN cursor) must be + quoted. A production adapter should quote all identifiers it emits. +- **Integer stats are `Long`.** Zone-map integer stats decode as `Long`, floats as `Double`, + regardless of column width. A filter literal boxed at the column's natural width (`Integer` for + `I32`) silently disables pruning because `ScanIterator.compareValues` swallows the resulting + `ClassCastException` and returns `0` (= "cannot prune"). The adapter coerces integer literals to + `Long`; the underlying reader trap is filed as issue #159. +- **`BETWEEN`/`IN` are `SEARCH(Sarg)`**, not `AND(>=,<=)`/`OR` — expand with `RexUtil.expandSearch` + before translating. +- **Calcite 1.40 removed `RelRule.Config.EMPTY`.** The modern `RelRule.Config` path needs the + Immutables annotation processor; the deprecated `RelOptRule` operand constructor is lighter for + a single adapter rule (localized `@SuppressWarnings("deprecation")`). + +The prototype registers the Phase-2 rule through a `HepPlanner` in a test. Wiring it into the +bare `jdbc:calcite:` planner (so SQL over JDBC is auto-rewritten) is the main remaining +productionisation step, alongside the writer `SUM` zone-stat. + +**Two doors, chosen by query shape.** Calcite is the right tool for *reducing* queries (filter +/ aggregate / group-by), where push-down shrinks the result and the `Object[]` boundary +amortises to near-nothing. It is the wrong tool for *bulk columnar extract* (`SELECT *` over a +large file, weak/no filter): every row boxes. That shape should bypass Calcite via the Arrow +C-Data export of ADR 0016 (Option B), columnar and zero-boxing. Calcite is never Vortex's +*execution* engine — the moment many rows flow *through* it, the wrong door was used. The +adapter's job is to push filter/project/aggregate down hard so what reaches `Object[]` is small +by construction. + +## Consequences + +### Positive + +- SQL over Vortex with no engine to build or maintain; Calcite owns parse/plan/optimise/join. +- The push-down surface is exactly the ADR 0013 primitive set — the adapter is the first real + consumer of that vocabulary, validating it against a real planner. +- Dependency isolation: only consumers who add `vortex-calcite` pay the Calcite/Janino/Guava + cost; the core stays clean and (per ADR 0017) JPMS-viable. +- Aggregate push-down gives the headline result — `MIN`/`MAX`/`COUNT` answered from a tiny stats + table, ~100× faster than a full scan on the prototype, exact against ground truth. +- Concrete motivation for the writer-side `SUM`/`NULL_COUNT` zone-stat increment: once emitted, + `SUM`/`AVG`/`COUNT` join the no-decode tier. + +### Negative + +- Calcite's Enumerable execution is row-at-a-time `Object[]` with autoboxing. Acceptable for a + *source* (push-down does the heavy work before rows materialise), but it caps throughput on + any query that emits many rows — which is why bulk extract is pushed to the Arrow door, not + Calcite. +- A second public surface (SQL) with its own semantics gotchas (reserved words, `AVG` integer + division, three-valued logic) the adapter must honour exactly to stay consistent with the + push-down path. +- The `RelOptRule` for aggregate push-down is non-trivial Calcite-internal work; the kernel + matrix (Array variant × Predicate variant) from ADR 0013 still has to exist underneath. + +### Risks to manage + +- **Push-down/fallback divergence.** A pushed-down aggregate must produce bit-identical results + to Calcite's own computation over the scanned rows (NULL handling, integer vs double `AVG`, + overflow). Test every pushed aggregate against the full-scan ground truth, as the prototype + does. +- **Calcite version churn.** Calcite's adapter SPI and Avatica move between releases; pin the + version and keep the smoke test as the JDK-compatibility tripwire (re-run on JDK upgrades). +- **Temptation to make Calcite columnar.** Bridging Vortex into a vectorised execution engine is + the rabbit hole; that is DuckDB/DataFusion territory (native — the `vortex-jni` world). Resist + it. Keep Calcite as front-end + planner only. +- **Lifetime.** `Object[]` rows hold decoded values, but any zero-copy path (future) must keep + the reader's `Arena` alive while the `Enumerator` is open — the same release-callback hazard + flagged for the Arrow C-Data bridge (ADR 0016). + +## Alternatives considered + +### A. Build a small bespoke SQL engine + +A hand-rolled parser + executor over the ADR 0013 kernels (single table, `WHERE`, simple +aggregates, no joins). Attractive as a *demo* of push-down, but it dies at the first join or +optimiser requirement and re-implements solved, generic machinery. Rejected as a product +direction; the prototype's value is the adapter + push-down helper, not a query language. + +### B. Hand off entirely to DuckDB / DataFusion via Arrow + +Export decoded columns through the Arrow C-Data interface (ADR 0016 Option B) and let a mature +native engine run all SQL. Least code, fastest *execution* — but the engine receives *decoded* +Arrow, so encoded-domain push-down (Vortex's whole edge) is lost; only zone-map chunk-skip +survives, and only if Vortex pre-filters before export. This is the right path for *bulk +extract* and for native consumers, and it coexists with this ADR as the "second door" — not a +replacement for JVM SQL with push-down. + +### C. Apache Calcite with full custom physical convention + +Implement a complete `Convention` with vectorised Vortex operators so execution stays columnar +end-to-end. Maximum performance, but it means building a vectorised execution engine inside +Calcite — most of option A's cost plus Calcite-internal complexity. Deferred indefinitely; the +`Object[]` Enumerable path plus aggressive push-down is sufficient for the source role, and bulk +throughput belongs to the Arrow door. + +## References + +- Prototype: `feat/vortex-calcite-demo` — `calcite/` module (`VortexTable`, `VortexSchema`, + `VortexAggregates`, `CalciteSmokeTest`, `OhlcSqlDemoTest`) +- [ADR 0013 — Compute primitives](0013-compute-primitives.md) §6 (aggregate push-down via + zone-map stats; `SUM`/`NULL_COUNT` writer increment) +- [ADR 0016 — vortex-arrow bridge](0016-vortex-arrow-bridge.md) (the "bulk extract" door) +- [ADR 0017 — in-house FlatBuffers codegen](0017-in-house-flatbuffers-codegen.md) (JDK 25 / + generated-code pressure that motivated the Janino smoke test) +- Apache Calcite adapter SPI — https://calcite.apache.org/docs/adapter.html diff --git a/docs/adr/ADR.md b/docs/adr/ADR.md index e75686a22..a84f01437 100644 --- a/docs/adr/ADR.md +++ b/docs/adr/ADR.md @@ -30,3 +30,4 @@ Each ADR is a Markdown file named `NNNN-short-title.md`. Use `template.md` as th | 0015 | Drop Materialized fallbacks once Lazy has shipped | Completed | 0.8.0 | | 0016 | vortex-arrow bridge module for Arrow interop | Proposed | | | 0017 | In-house FlatBuffers codegen + MemorySegment runtime | Accepted | | +| 0018 | Apache Calcite SQL adapter — push-down source | Proposed | | diff --git a/integration/pom.xml b/integration/pom.xml index b5334b7df..0f58d97e5 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -49,6 +49,12 @@ hardwood-core test + + io.github.dfa1.vortex + vortex-core + test-jar + test + io.github.dfa1.vortex vortex-reader diff --git a/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java b/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java index 19cbe59b5..6c317d3ab 100644 --- a/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java +++ b/integration/src/test/java/io/github/dfa1/vortex/integration/OhlcGenerator.java @@ -1,18 +1,15 @@ package io.github.dfa1.vortex.integration; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Arrays; +import io.github.dfa1.vortex.core.testing.OhlcData; + import java.util.List; -import java.util.Random; -class OhlcGenerator { +/// Thin adapter over the shared [OhlcData] generator (core test-jar), mapping each batch to the +/// field order the integration writers expect (`symbols`/`dates` first). The random-walk lives in +/// [OhlcData]; this only adapts the shape. +final class OhlcGenerator { - static final String[] TICKERS = { - "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "BRK.B", "JPM", "V", - "UNH", "XOM", "LLY", "JNJ", "MA", "PG", "HD", "MRK", "AVGO", "CVX", - "PEP", "ABBV", "KO", "COST", "WMT", "MCD", "CSCO", "ACN", "BAC", "CRM" - }; + static final String[] TICKERS = OhlcData.TICKERS; private OhlcGenerator() { } @@ -22,41 +19,9 @@ static List generate(int totalRows, int batchSize) { } static List generate(int totalRows, int batchSize, long seed) { - double[] prices = new double[TICKERS.length]; - Arrays.fill(prices, 100.0); - var rng = new Random(seed); - int startDay = (int) LocalDate.of(2020, 1, 2).toEpochDay(); - int rowsLeft = totalRows; - int rowsDone = 0; - var batches = new ArrayList(); - - while (rowsLeft > 0) { - int n = Math.min(rowsLeft, batchSize); - String[] symbols = new String[n]; - int[] dates = new int[n]; - double[] open = new double[n], high = new double[n], low = new double[n], close = new double[n]; - long[] volume = new long[n]; - for (int i = 0; i < n; i++) { - int ticker = i % TICKERS.length; - double px = prices[ticker]; - double ret = rng.nextGaussian() * 0.02; - double o = Math.round(px * (1 + ret * 0.3) * 100.0) * 0.01; - double c = Math.round(px * (1 + ret) * 100.0) * 0.01; - double spread = Math.abs(px * rng.nextDouble() * 0.03); - symbols[i] = TICKERS[ticker]; - dates[i] = startDay + (rowsDone + i) / TICKERS.length; - open[i] = o; - high[i] = Math.round((Math.max(o, c) + spread) * 100.0) * 0.01; - low[i] = Math.round((Math.min(o, c) - spread) * 100.0) * 0.01; - close[i] = c; - volume[i] = Math.max(100_000L, Math.round(1_000_000 + rng.nextGaussian() * 200_000)); - prices[ticker] = c; - } - batches.add(new OhlcBatch(symbols, dates, open, high, low, close, volume)); - rowsLeft -= n; - rowsDone += n; - } - return batches; + return OhlcData.generate(totalRows, batchSize, seed).stream() + .map(b -> new OhlcBatch(b.symbol(), b.date(), b.open(), b.high(), b.low(), b.close(), b.volume())) + .toList(); } record OhlcBatch(String[] symbols, int[] dates, double[] open, double[] high, diff --git a/pom.xml b/pom.xml index 7d421574e..fe735a46b 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ bom cli inspector + calcite integration performance From 888698fe8bb252d61b2fa2ef2c9ba596bce2604c Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Thu, 25 Jun 2026 13:25:28 +0200 Subject: [PATCH 2/2] perf(calcite): stream scan rows lazily instead of materialising a List VortexTable.scan() built the entire result as a List (one fresh array + boxed cell per row) before returning. For a 1M-row full scan an async-profiler run showed ~72% of CPU in G1 GC: every row was promoted into the old gen and the whole result stayed live at once. Column decode itself was ~0.5%. Replace it with a streaming Enumerator that advances chunk by chunk, decoding each requested column once per chunk and yielding one row per moveNext(). Rows are no longer retained, so the working set is one chunk and rows die in the young gen. Fresh array per row is kept (correct for ORDER BY / joins that retain rows). Measured (CalciteDemo, 1M rows, MIN/MAX/COUNT full scan): GC 71% -> 3%, ~52 ms/query -> ~28 ms/query. CalciteDemo is a profiling harness, disabled unless -Ddemo.profile=true; run under async-profiler by attaching to the forked test JVM (argLine is owned by the byte-buddy agent goal, so attach by PID rather than -agentpath). Co-Authored-By: Claude Opus 4.8 --- .../dfa1/vortex/calcite/VortexTable.java | 129 ++++++++++++++---- .../dfa1/vortex/calcite/CalciteDemo.java | 61 +++++++++ 2 files changed, 162 insertions(+), 28 deletions(-) create mode 100644 calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java diff --git a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java index 65fa352e1..20f2a4024 100644 --- a/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java +++ b/calcite/src/main/java/io/github/dfa1/vortex/calcite/VortexTable.java @@ -15,8 +15,9 @@ import org.apache.calcite.DataContext; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; @@ -139,21 +140,108 @@ public Enumerable scan(DataContext root, List filters, int[] options = ScanOptions.columns(outNames); } - List rows = new ArrayList<>(); - long scanned = 0; - try (VortexReader reader = VortexReader.open(file); - ScanIterator scan = reader.scan(options)) { - while (scan.hasNext()) { - try (Chunk chunk = scan.next()) { - scanned++; - appendChunk(chunk, outNames, outTypes, rows); + // Stream rows lazily: decode one chunk at a time and yield a fresh row, so rows die young + // (in G1's young gen) instead of piling a whole-result List into the old gen — the + // dominant cost an async-profiler run showed for the full-scan path (~72% in GC). + ScanOptions scanOptions = options; + return new AbstractEnumerable<>() { + @Override + public Enumerator enumerator() { + return new VortexEnumerator(scanOptions, outNames, outTypes); + } + }; + } + + /// Streaming [Enumerator] over a Vortex scan: advances chunk by chunk, decoding each requested + /// column once per chunk and materialising one `Object[]` row per [#moveNext()]. Rows are not + /// retained, so the working set stays at one chunk rather than the whole result. + private final class VortexEnumerator implements Enumerator { + + private final String[] names; + private final DType[] types; + private final VortexReader reader; + private final ScanIterator scan; + private Chunk chunk; + private Object[] columns; + private long rowInChunk; + private long chunkRows; + private Object[] current; + + private VortexEnumerator(ScanOptions options, String[] names, DType[] types) { + this.names = names; + this.types = types; + chunksScannedLastQuery.set(0); + VortexReader openedReader = null; + try { + openedReader = VortexReader.open(file); + this.reader = openedReader; + this.scan = openedReader.scan(options); + } catch (IOException e) { + closeQuietly(openedReader); + throw new UncheckedIOException("cannot scan " + file, e); + } catch (RuntimeException e) { + closeQuietly(openedReader); + throw e; + } + } + + private void closeQuietly(VortexReader r) { + if (r != null) { + r.close(); + } + } + + @Override + public Object[] current() { + return current; + } + + @Override + public boolean moveNext() { + while (true) { + if (chunk != null && rowInChunk < chunkRows) { + Object[] row = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + row[c] = value(columns[c], types[c], rowInChunk); + } + rowInChunk++; + current = row; + return true; + } + if (chunk != null) { + chunk.close(); + chunk = null; + } + if (!scan.hasNext()) { + return false; + } + chunk = scan.next(); + chunksScannedLastQuery.incrementAndGet(); + chunkRows = chunk.rowCount(); + rowInChunk = 0; + columns = new Object[names.length]; + for (int c = 0; c < names.length; c++) { + columns[c] = chunk.column(names[c]); } } - } catch (IOException e) { - throw new UncheckedIOException("cannot scan " + file, e); } - chunksScannedLastQuery.set(scanned); - return Linq4j.asEnumerable(rows); + + @Override + public void reset() { + throw new UnsupportedOperationException("VortexEnumerator does not support reset"); + } + + @Override + public void close() { + try { + if (chunk != null) { + chunk.close(); + } + } finally { + scan.close(); + reader.close(); + } + } } private DType.Struct struct() { @@ -175,21 +263,6 @@ private static int[] allColumns(int n) { return all; } - private static void appendChunk(Chunk chunk, String[] names, DType[] types, List rows) { - long n = chunk.rowCount(); - Object[] arrays = new Object[names.length]; - for (int c = 0; c < names.length; c++) { - arrays[c] = chunk.column(names[c]); - } - for (long r = 0; r < n; r++) { - Object[] row = new Object[names.length]; - for (int c = 0; c < names.length; c++) { - row[c] = value(arrays[c], types[c], r); - } - rows.add(row); - } - } - private static Object value(Object array, DType type, long r) { return switch (type) { case DType.Primitive p -> switch (p.ptype()) { diff --git a/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java new file mode 100644 index 000000000..7ac52b53c --- /dev/null +++ b/calcite/src/test/java/io/github/dfa1/vortex/calcite/CalciteDemo.java @@ -0,0 +1,61 @@ +package io.github.dfa1.vortex.calcite; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +/// Profiling harness (disabled in normal runs). Writes a 1M-row OHLC Vortex file once, then runs +/// the full-table `MIN/MAX/COUNT` SQL many times through Calcite so a CPU profile has enough +/// samples to show where the scan path spends its time. +/// +/// Run under async-profiler: +/// ``` +/// ./mvnw test -pl calcite -am -Dtest=CalciteDemo -Ddemo.profile=true \ +/// -DargLine="-agentpath:/opt/homebrew/lib/libasyncProfiler.dylib=start,event=cpu,file=/tmp/calcite.collapsed,collapsed" +/// ``` +class CalciteDemo { + + @Test + @EnabledIfSystemProperty(named = "demo.profile", matches = "true") + void profileFullScan() throws Exception { + int rows = Integer.getInteger("demo.rows", 1_000_000); + int iterations = Integer.getInteger("demo.iterations", 300); + + Path file = Files.createTempFile("ohlc-demo", ".vortex"); + try { + OhlcGenerator.write(file, rows, 10_000); + System.out.printf("wrote %,d rows -> %.1f MB%n", rows, Files.size(file) / 1048576.0); + + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + String sql = "select min(low) lo, max(high) hi, count(*) c from vtx.ohlc"; + + try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info)) { + conn.unwrap(org.apache.calcite.jdbc.CalciteConnection.class).getRootSchema() + .add("vtx", new VortexSchema(Map.of("ohlc", file))); + + long t0 = System.nanoTime(); + long count = 0; + for (int i = 0; i < iterations; i++) { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + rs.next(); + count = rs.getLong("c"); + } + } + double ms = (System.nanoTime() - t0) / 1e6 / iterations; + System.out.printf("full scan x%d: %.2f ms/query | count=%,d%n", iterations, ms, count); + } + } finally { + Files.deleteIfExists(file); + } + } +}