Skip to content

fix(#74): exclude CodegenFallback children from TimedWithCodegenExec stage#75

Merged
minskya merged 7 commits into
mainfrom
claude/fix-issue-74-efwfP
May 10, 2026
Merged

fix(#74): exclude CodegenFallback children from TimedWithCodegenExec stage#75
minskya merged 7 commits into
mainfrom
claude/fix-issue-74-efwfP

Conversation

@minskya
Copy link
Copy Markdown
Contributor

@minskya minskya commented May 9, 2026

Fixes #74.

Summary

from_json (and any other CodegenFallback expression that reads ctx.INPUT_ROW) NPE'd in Block.code interpolation when DataFlint's TimedWithCodegenExec wrapped the operator that contained it under whole-stage codegen.

java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "arg" is null
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotInterpolateClassIntoCodeBlockError
    at org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1
    ...
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode
    at org.apache.spark.sql.catalyst.expressions.JsonToStructs.doGenCode

Root cause

Spark's CollapseCodegenStages already excludes plans whose expressions contain a CodegenFallback. But TimedWithCodegenExec exposes children = child.children (transparent wrapping for the SQL UI), which hides the wrapped child's expressions from that check — so an operator that would normally be broken out of whole-stage codegen ends up inside it, behind the wrapper, and the code\"…\" interpolator NPEs on the null INPUT_ROW.

The first attempt (the original commit on this branch) tried to plumb row.value through doConsume. That approach didn't actually prevent the NPE — Spark's consume() still resets ctx.INPUT_ROW = null before calling the parent's doConsume — and it also broke prepareRowVar's projection logic (corrupt counts, occasionally invalid Java in the generated method signature). It's been reverted on this branch.

The fix

Mirror the same CodegenFallback check that CollapseCodegenStages.supportCodegen already does, inside TimedWithCodegenExec.supportCodegen:

override def supportCodegen: Boolean = {
  val c = child.asInstanceOf[CodegenSupport]
  c.supportCodegen &&
    (TimedExec.isLegacySpark || child.children.length <= 1) &&
    !child.expressions.exists(_.find(_.isInstanceOf[CodegenFallback]).isDefined)
}

When the wrapped child has any CodegenFallback expression, we report supportCodegen = false. CollapseCodegenStages then inserts an InputAdapter between us and the next stage, breaking the whole-stage at the boundary — exactly the behavior end users get without the wrapper. The expression goes through the interpreted path, where INPUT_ROW doesn't matter.

TreeNode.find (not .exists) is used because Expression.exists was added in Spark 3.2; calling it at runtime on Spark 3.0/3.1 NoSuchMethodErrors. find has been on TreeNode since the earliest 3.x lines.

Test coverage

  • DataFlintCodegenFallbackSpec — new regression test that runs the exact PySpark snippet from the issue (from_jsonexplodefilter) under whole-stage codegen and asserts the count matches. Without the fix this NPEs; with it, the wrapped ProjectExec is correctly excluded from the WholeStage and the query runs.
  • Test runs against both Spark 3.5 and Spark 4.0.1pluginspark4 previously had no test setup at all; this PR adds one that shares version-portable specs from pluginspark3 so the same regression suite executes on the Spark 4 surface.
  • DataFlintWriteMetricsSpec — guard for the executeCollect rebuild path: confirms DataWritingCommandExec metrics (numFiles, numOutputBytes, numOutputRows, etc.) survive the rebuild via the shared cmd instance.
  • TimedExecMetricsSpec — covers the rddId / canEqual fixes below.

Drive-by hardening on TimedExec

While reviewing the wrapper for adjacent issues, fixed five small things that the original CI / runtime exposed:

  1. postRddId accumulated: was +=, now set — re-executing the same TimedExec instance no longer sums every RDD id it ever wrapped.
  2. canEqual collapsed variants: TimedExec(x).equals(TimedWithCodegenExec(x)) was true; they have different execution semantics so canonicalization could conflate them. Now matches by exact runtime class.
  3. isLegacySpark parser: wrapped in Try — vendor versions with non-numeric major/minor parts no longer throw at class-load time.
  4. executeCollect write path: child.children.head / innerChild.children.head replaced with headOption fall-through to super.executeCollect() if the assumed shape doesn't hold.
  5. rddId metric type: was a "size" metric (rendered as bytes — \"12 B\" in the SparkUI), now a plain sum metric.

CI

CI was running on Java 8. Spark 4.0.1 needs Java 17+, so adding pluginspark4 tests caused the forked test JVM to hang at startup — every run timed out at 6h. Bumped to temurin@17. Spark 3.5 also supports Java 17, so pluginspark3 tests run on the same JVM unchanged. The published jars stay Java 8 compatible — verified via javap on the produced classes (major version 52 across all four module/Scala variants); Scala 2.12/2.13 default to Java 8 bytecode regardless of which JVM compiles them.

Test plan

  • DataFlintCodegenFallbackSpec reproduces the original NPE without the fix and passes with it
  • sbt +test green on Spark 3.5 (Scala 2.12 + 2.13) and Spark 4.0.1 (Scala 2.13)
  • pluginspark4 runs DataFlintCodegenFallbackSpec against Spark 4.0.1 / Java 17
  • PySpark cross-version harness (utils/.spark-versions/3.1.2) — parquet write no longer NoSuchMethodErrors

🤖 Generated with Claude Code

TimedWithCodegenExec.doConsume was dropping the row: ExprCode parameter
when calling consume(ctx, input). This caused ctx.INPUT_ROW to be null
for any downstream CodegenFallback expression (e.g. from_json) that
interpolates INPUT_ROW into its generated code, producing an NPE inside
Block.code interpolation.

Pass row.value through to consume so downstream CodegenFallback
expressions see a valid INPUT_ROW under whole-stage codegen.
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 9, 2026

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ minskya
❌ claude
You have signed the CLA already but the status is still pending? Let us recheck it.

claude and others added 6 commits May 9, 2026 13:08
The previous fix (propagate `row.value` in doConsume) didn't actually
prevent the NPE — Spark's `consume()` resets `ctx.INPUT_ROW = null`
internally before calling parent.doConsume, so a downstream
CodegenFallback still saw null and NPE'd in Block.code interpolation.
It also produced corrupt output (count returned garbage) and could emit
syntactically invalid Java when row.value was a complex expression.

Root cause: Spark's `CollapseCodegenStages` already excludes plans whose
expressions contain a CodegenFallback (e.g. from_json / JsonToStructs).
But TimedWithCodegenExec exposes its child's children transparently
(`children = child.children`), hiding the wrapped child from that check
— so a node that wouldn't normally be wholestaged ends up wholestaged
behind a TimedExec wrapper, and the CodegenFallback expression generates
code referencing the now-null ctx.INPUT_ROW.

Fix: mirror that CodegenFallback check inside
TimedWithCodegenExec.supportCodegen. When the wrapped child has any
CodegenFallback expression, we report supportCodegen=false so the
framework inserts an InputAdapter and breaks the WholeStage at this
boundary — exactly what would happen without the wrapper.

Reverts the doConsume change from the previous commit.
pluginspark4 had no test setup, so the Spark-4 build of the plugin
source was only compile-checked, never exercised. Add a test
configuration mirroring pluginspark3 (scalatest, Spark 4.0.1 test
deps, fork + --add-opens) and pull in DataFlintCodegenFallbackSpec
so the issue #74 regression test runs against both Spark 3.5 and
Spark 4.0.

Most pluginspark3 specs depend on Spark-3-only internals (Dataset
constructor, PythonMapInArrowExec, etc.) and won't compile against
Spark 4, so the test source list is explicit rather than a directory
include — only version-portable suites are added.
Earlier review claimed the executeCollect rebuild path stales out
WriteFilesExec / DataWritingCommandExec metrics on the original tree.
This test verifies — and disproves — that:

- DataWritingCommandExec.metrics delegates to cmd.metrics, and
  withNewChildren reuses the same cmd instance, so numOutputRows /
  numFiles / numOutputBytes are the same SQLMetric objects on the
  original and rebuilt nodes.
- WriteFilesExec (3.4+) has no metrics field of its own.
- The data plan is shared by reference via RDDTimingWrapper.

Net: the rebuild is metric-safe today. The test asserts those values
flow through to the UI-visible plan tree so that any future change
that breaks one of those assumptions fails loudly.
- postRddId: switch from += to set so re-executing the same TimedExec
  instance overwrites the RDD-id metric instead of summing across runs.
  Test confirms metric == latest, not the sum (TimedExecMetricsSpec).
- canEqual: match runtime class so TimedExec(x) and TimedWithCodegenExec(x)
  no longer compare equal — TreeNode equality / canonicalization use this
  for plan reuse and the two have different execution semantics.
- isLegacySpark: wrap version parse in Try; vendor strings with non-numeric
  major/minor default to non-legacy (matches every release ≥ 3.2).
- executeCollect write-path: replace child.children.head /
  innerChild.children.head with headOption-based fall-through to
  super.executeCollect() if the assumed shape doesn't hold (vendor write
  commands, future Spark layouts).
- rddId metric: use a plain sum metric instead of size — size renders the
  RDD id as bytes ("12 B") in the SparkUI.

RDDTimingWrapper now carries an explicit comment about the durationMetric
not being in productArity (we override withNewChildrenInternal to plumb
it through manually).
The pluginspark4 test config added in c42752e forks a JVM and bootstraps
Spark 4.0.1, which requires Java 17+. CI was running Java 8 (temurin@8),
so the forked JVM hung trying to start Spark 4 — every PR run since that
commit timed out at the 6h limit.

Spark 3.5 also supports Java 17, so plugin/pluginspark3 tests run on the
same JVM with no behavioral change. Scala 2.12/2.13 default to Java 8
bytecode (verified via javap on the existing target/ output: major
version 52 across all four module/scala variants), so the published jars
stay loadable by Java 8 runtimes — only the build/test JVM changes.
`Expression.exists(predicate)` was added to TreeNode in Spark 3.2.
Calling it on Spark 3.0/3.1 NoSuchMethodErrors at runtime even though
the code compiles fine against newer Spark headers — the build/Spark
versions are decoupled at runtime when DataFlint is loaded as a plugin.

Replace `_.exists(_.isInstanceOf[CodegenFallback])` with
`_.find(_.isInstanceOf[CodegenFallback]).isDefined`. `find` has been on
TreeNode since the earliest 3.x lines, so the supportCodegen check now
works across the whole supported range (3.0 → 4.x).

Reproduced via the cross-version pyspark integration harness:
utils/.spark-versions/3.1.2 → java.lang.NoSuchMethodError on
Expression.exists during a parquet write. With this change the same
write succeeds.
@minskya minskya requested a review from menishmueli May 10, 2026 08:08
@minskya minskya changed the title fix: propagate row in TimedWithCodegenExec.doConsume (#74) fix(#74): exclude CodegenFallback children from TimedWithCodegenExec stage May 10, 2026
@minskya minskya merged commit 705be40 into main May 10, 2026
6 of 7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TimedWithCodegenExec.doConsume drops row: ExprCode → NPE in CodegenFallback expressions (e.g. from_json) under whole-stage codegen

4 participants