Skip to content

[Feature][DataLoader] Add DataFusion SQL transformation support#496

Merged
ShreyeshArangath merged 20 commits intolinkedin:mainfrom
ShreyeshArangath:feat/add-basic-datafusion-integration
Mar 16, 2026
Merged

[Feature][DataLoader] Add DataFusion SQL transformation support#496
ShreyeshArangath merged 20 commits intolinkedin:mainfrom
ShreyeshArangath:feat/add-basic-datafusion-integration

Conversation

@ShreyeshArangath
Copy link
Collaborator

@ShreyeshArangath ShreyeshArangath commented Mar 11, 2026

Summary

  • Replace the LogicalPlan/Substrait-based TableTransformer with a simpler SQL-string interface — transformers return a SQL string (or None) that is executed per-batch via DataFusion
  • Each split creates one DataFusion SessionContext and rebinds the batch table for each record batch, avoiding per-batch session overhead

High-Level Design

The TableTransformer.transform() method is called once during planning to produce a SQL string. That SQL is then attached to every DataLoaderSplit and executed per-batch at iteration time. This keeps the transformer interface simple (no DataFusion dependency for implementors) and the SQL string is trivially picklable for distributed execution.

OpenHouseDataLoader.__iter__()
      ├── transformer.transform(table_id, context) → SQL string (once)
      ├── to_datafusion_sql(sql, dialect) → transpile if needed
      └── for each file scan task:
            yield DataLoaderSplit(scan_context, transform_sql)


  DataLoaderSplit.__iter__()
    ├── SessionContext created (once per split)
    └── for each batch from ArrowScan:
          ├── bind batch as table_id.sql_name
          ├── execute transform SQL
          └── yield result batches

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

ShreyeshArangath and others added 10 commits March 10, 2026 12:20
Connect the TableTransformer plumbing so transforms (e.g. column masking)
are applied per-batch at read time. The transformer is called once at the
loader level to validate it returns non-None, then passed to each
DataLoaderSplit which calls it per-batch with a fresh SessionContext.

Key design decisions:
- Transformer-per-batch instead of LogicalPlan replay: DataFusion plans
  carry table sources by reference, so a plan created in one SessionContext
  cannot read data registered in another. The transformer is re-invoked
  per batch instead.
- Column projection skipped when transform active: the transform may
  reference columns beyond the user's projection. Pushdown deferred to
  follow-up.
- table_name is a required field on TableScanContext, always derived from
  TableIdentifier.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add sql_name property to TableIdentifier, extract shared session-setup
helper, remove table_name from TableScanContext, and document the
empty-data probe contract on TableTransformer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace Substrait-based transform pipeline with SQL string approach:
TableTransformer now returns a SQL string instead of a DataFrame,
eliminating serialization complexity. Each split executes the SQL
against its own DataFusion session with real batch data. Add pickle
support to TableScanContext and DataLoaderSplit for distributed execution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Escape embedded double quotes in SQL identifiers to prevent SQL injection
- Replace assert with proper RuntimeError for optimized-mode safety
- Fail fast at construction when transform_sql is given without table_id
- Pass execution context explicitly to _build_transform_sql for testability
- Deduplicate test helpers and add identifier escaping tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Create the session once per split instead of once per batch. Extract
_bind_batch_table helper that deregisters then re-registers each batch.
Adds tests for session reuse and batch rebinding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts: keep SQL-string transform design over Substrait/LogicalPlan
approach. Pick up upstream fix for load_file_io location parameter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
TableIdentifier naturally belongs with the scan-level context rather
than being passed separately to each split. This simplifies the
DataLoaderSplit constructor and keeps table identity co-located with
the other scan metadata that travels through pickle.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts by combining transform/UDF support from the feature
branch with batch_size/ArrivalOrder support from main. The feature
branch's transform_sql approach supersedes main's substrait plan
scaffolding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Make TableScanContext.table_id a required field (it is always provided)
- Remove redundant None guards for table_id in DataLoaderSplit
- Add DataLoaderRuntimeError(RuntimeError) for internal invariant violations
- Export DataLoaderRuntimeError from the public API

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ShreyeshArangath ShreyeshArangath marked this pull request as ready for review March 11, 2026 23:15
ShreyeshArangath and others added 2 commits March 11, 2026 16:25
…check

Replace unreachable RuntimeError guard in _apply_transform with an
assert, and remove the unused _exceptions module and its public export.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflict in test_data_loader.py by keeping both transformer
tests (this branch) and branch tests (upstream).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Raise ValueError when column projections are used with table transformers
  instead of silently skipping projection
- Move sql_name out of TableIdentifier into data_loader_split as
  to_sql_identifier() to keep TableIdentifier free of DataFusion knowledge
- Remove unnecessary assert in _apply_transform
- Update tests accordingly

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
robreeves added a commit that referenced this pull request Mar 16, 2026
## Summary

Add a custom [SQLGlot](https://github.com/tobymao/sqlglot) dialect for
DataFusion and a `to_datafusion_sql` function that transpiles SQL from
any supported source dialect to DataFusion SQL.

This is the first step in decoupling the `TableTransformer` API from
DataFusion internals. Instead of returning a DataFusion DataFrame
(leaking the execution engine to users), the `TableTransformer` will
return a SQL string and its dialect. The data loader will then use
SQLGlot to translate that SQL to DataFusion for execution. It will be
used in #496.

We maintain the DataFusion dialect in-repo rather than contributing it
upstream to SQLGlot because the SQLGlot maintainers don't have capacity
to review more community dialects right now
([source](tobymao/sqlglot#7275 (comment))).

Context:
#496 (comment)

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**DataFusion dialect** (`datafusion_sql.py`): custom SQLGlot dialect
with DataFusion-specific function mappings (e.g. `SIZE` → `cardinality`,
`ARRAY()` → `make_array`, `CURRENT_TIMESTAMP()` → `now()`), type
mappings (e.g. `CHAR`/`TEXT` → `VARCHAR`, `BINARY` → `BYTEA`), and
identifier/normalization rules.

**SQL translator** (`datafusion_sql.py`): `to_datafusion_sql(sql,
source_dialect)` accepts any supported source dialect (spark, postgres,
mysql, etc.) and transpiles to DataFusion. When source_dialect is
`"datafusion"` it returns the SQL unchanged. Validates the dialect with
a clear error listing all supported options.

**Dependency**: added `sqlglot>=29.0.0`.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Parametrized transpilation tests cover spark, mysql, postgres, and
datafusion identity. Edge case tests for unsupported dialects and
multi-statement errors. E2E test executes transpiled SQL against
DataFusion and validates output data.

```
make check  # All checks passed (ruff, mypy)
make test   # 19 dialect tests pass
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

This is the first PR. Follow-up PRs will integrate the translator into
the `TableTransformer` API and data loader pipeline.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
ShreyeshArangath and others added 3 commits March 16, 2026 12:32
Add dialect property to TableTransformer (defaults to "datafusion") and
call to_datafusion_sql() in _build_transform_sql so transformers can
return SQL in any SQLGlot-supported dialect.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Keep the transpilation call in data_loader but default to "datafusion"
directly rather than adding a dialect property to the ABC.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ShreyeshArangath ShreyeshArangath marked this pull request as draft March 16, 2026 19:38
ShreyeshArangath and others added 3 commits March 16, 2026 12:52
Restore the dialect property on TableTransformer as a plain str so
transformers can declare their source SQL dialect. The data loader
transpiles non-DataFusion dialects to DataFusion via SQLGlot. Simplify
validation by removing the Dialect type alias and letting to_datafusion_sql
be the single runtime validator.

Add tests for Spark-dialect transpilation and invalid dialect rejection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ShreyeshArangath ShreyeshArangath marked this pull request as ready for review March 16, 2026 19:57
…ation details

- Simplify TableTransformer class docstring (remove verbose super().__init__ instructions)
- Fix dialect arg description to describe what, not how
- Remove unnecessary "must not depend on row data" note from transform() docstring
- Simplify _build_transform_sql docstring (remove implementation details)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ShreyeshArangath ShreyeshArangath merged commit fe1a005 into linkedin:main Mar 16, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants