Skip to content

spark connect#37

Open
dhama-shashank-meesho wants to merge 10 commits intomasterfrom
spark-connect
Open

spark connect#37
dhama-shashank-meesho wants to merge 10 commits intomasterfrom
spark-connect

Conversation

@dhama-shashank-meesho
Copy link

@dhama-shashank-meesho dhama-shashank-meesho commented Mar 3, 2026

What is this PR for?

A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html

What type of PR is it?

Bug Fix
Improvement
Feature
Documentation
Hot Fix
Refactoring
Please leave your type of PR only

Todos

  • - Task

What is the Jira issue?

How should this be tested?

  • Strongly recommended: add automated unit tests for any new or changed behavior
  • Outline any manual steps to test the PR here.

Screenshots (if appropriate)

Questions:

  • Does the license files need to update?
  • Is there breaking changes for older versions?
  • Does this needs documentation?

Summary by CodeRabbit

  • New Features

    • Spark Connect support: SQL, PySpark and IPySpark interpreters with per-user session limits, per‑notebook locking, optional streaming results, configurable result limits and stacktrace behavior, and connection/token/SSL options.
    • Safe Python wrappers and utilities for Spark Connect to interact with DataFrames/sessions and render or stream tabular results.
  • Chores

    • Added a new spark-connect module for packaging and distribution.
  • Tests

    • Unit and integration tests for connection strings, utilities, SQL/PySpark flows, limits, streaming and end‑to‑end Spark Connect scenarios.

@coderabbitai
Copy link

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a new spark-connect Maven module providing Spark Connect interpreters (SparkConnect, SQL, PySpark, IPySpark), per-user session management, per-notebook locking, DataFrame connection/streaming utilities, Python wrappers, shading config, interpreter settings, and tests.

Changes

Cohort / File(s) Summary
Module setup & resources
pom.xml, spark-connect/pom.xml, spark-connect/src/main/resources/interpreter-setting.json
Adds spark-connect Maven module and module POM (dependencies, shading/relocations, profile). Adds interpreter-setting JSON defining interpreters, properties, editors, and runtime toggles.
Core interpreters (Java)
spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java, .../SparkConnectSqlInterpreter.java, .../PySparkConnectInterpreter.java, .../IPySparkConnectInterpreter.java
New interpreters: SparkConnectInterpreter (remote Spark Connect session, per-user session-slot limits, SQL splitting, streaming/results rendering, error handling, cancel), SparkConnectSqlInterpreter (per-note locking, SQL execution/streaming), PySparkConnectInterpreter and IPySparkConnectInterpreter (Python/IPython bridges, env resolution, propagate InterpreterContext).
Concurrency & locking utilities
spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java
Adds per-notebook fair ReentrantLock manager (create/get/remove) to serialize statement execution per notebook.
DataFrame & connection utilities
spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java
Adds connection-string builders (token/SSL/user handling) and DataFrame rendering/streaming helpers producing Zeppelin %table output with truncation, reserved-char escaping, and row-limit guards.
Python wrappers
spark-connect/src/main/resources/python/zeppelin_sparkconnect.py, spark-connect/src/main/resources/python/zeppelin_isparkconnect.py
Adds Python modules exposing SparkConnectSession and SparkConnectDataFrame wrappers with safe collect/toPandas limits, iteration guards, common DataFrame APIs, and warnings to avoid large/unsafe collects.
Tests
spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java, .../SparkConnectSqlInterpreterTest.java, .../PySparkConnectInterpreterTest.java, .../SparkConnectUtilsTest.java
Adds integration tests (gated by SPARK_CONNECT_TEST_REMOTE) for session creation, SQL execution, DDL, limits, DataFrame handling, and unit tests for connection-string and reserved-character handling.

Sequence Diagram(s)

sequenceDiagram
  participant Client as Zeppelin UI / Notebook
  participant SQLInterp as SparkConnectSqlInterpreter
  participant LockMgr as NotebookLockManager
  participant CoreInterp as SparkConnectInterpreter
  participant Remote as Spark Connect Server
  participant Utils as SparkConnectUtils

  Client->>SQLInterp: execute SQL (noteId, query)
  SQLInterp->>LockMgr: acquire notebook lock(noteId)
  SQLInterp->>CoreInterp: submit statement(s)
  CoreInterp->>Remote: execute via SparkSession (gRPC)
  Remote-->>CoreInterp: return DataFrame/result
  CoreInterp->>Utils: render/stream DataFrame
  Utils-->>Client: write %table output
  SQLInterp->>LockMgr: release notebook lock(noteId)
Loading
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed This PR does not introduce any changes to files matching the application-dyn-*.yml pattern.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (11)
spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java (1)

41-44: Consider extracting lock policy literal and hardening utility shape.

Line 43 inlines the fairness literal. Consider promoting it to a named constant (and optionally making the class non-instantiable) for consistency and maintainability.

As per coding guidelines, "we should not use the literal values directly inside functions, they should be stored into separate variables and the variables should be used".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java`
around lines 41 - 44, Extract the fairness literal from the ReentrantLock
instantiation into a named private static final boolean (e.g., FAIR_LOCK = true)
and replace the inline literal in getNotebookLock (and any other uses) with that
constant; also harden the utility by preventing instantiation of
NotebookLockManager (add a private constructor) and ensure notebookLocks and
getNotebookLock remain static so the class stays a proper non-instantiable
utility holder.
spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java (1)

54-89: Extract shared Spark Connect integration test harness.

This setup/context-builder pattern is repeated across multiple test classes. Moving it to a shared base/helper will reduce drift and simplify future changes.

As per coding guidelines, "if duplicate code exists it should be moved to a common method".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java`
around lines 54 - 89, The test harness setup/teardown and context builder in
SparkConnectInterpreterTest (setUp, tearDown, and getInterpreterContext) are
duplicated across tests—extract them into a shared test base or helper class:
create a common abstract base (e.g., SparkConnectTestBase) or a TestUtils helper
that provides the static setup/cleanup methods (initializing InterpreterGroup,
SparkConnectInterpreter, calling interpreter.open/close) and a reusable
getInterpreterContext builder that returns the configured InterpreterContext
(using AngularObjectRegistry, LocalResourcePool, InterpreterOutput, mocked
RemoteInterpreterEventClient); then update SparkConnectInterpreterTest to extend
or call that shared base/helper and remove the duplicated code from the test
class.
spark-connect/pom.xml (1)

37-38: Remove duplicated spark.connect.version declaration to prevent drift.

The same version is declared both globally and again in the default-active profile. Keeping one source of truth reduces maintenance risk.

As per coding guidelines, "if duplicate code exists it should be moved to a common method".

Also applies to: 124-126

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/pom.xml` around lines 37 - 38, There are two declarations of
the Maven property spark.connect.version (one global and one inside the
default-active profile) causing duplication and drift; remove the duplicate
inside the profile (or remove the global one if you intend the profile to be
canonical) so only one spark.connect.version property remains, and update/remove
the matching duplicate at the other spot referenced (the other occurrence around
the default-active profile) to ensure a single source of truth for Spark Connect
version.
spark-connect/src/main/resources/python/zeppelin_isparkconnect.py (3)

94-99: Chain the exception for better debugging context.

When re-raising ImportError, chain it with from None to indicate it's an intentional replacement.

♻️ Proposed fix
         except ImportError:
-            raise ImportError(
+            raise ImportError(
                 "pandas is required for toPandas(). "
-                "Install it with: pip install pandas")
+                "Install it with: pip install pandas") from None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py` around
lines 94 - 99, In the except ImportError block that handles "import pandas as
pd" (used by toPandas()), re-raise the ImportError using exception chaining with
"from None" so the new ImportError intentionally replaces the original (i.e.,
raise ImportError("pandas is required for toPandas(). Install it with: pip
install pandas") from None).

264-266: Add stacklevel=2 for proper warning attribution.

♻️ Proposed fix
             if isinstance(data, pd.DataFrame):
                 warnings.warn(
                     "createDataFrame from pandas goes through Py4j serialization. "
-                    "For large DataFrames, consider writing to a temp table instead.")
+                    "For large DataFrames, consider writing to a temp table instead.",
+                    stacklevel=2)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py` around
lines 264 - 266, The warnings.warn call that alerts users about "createDataFrame
from pandas goes through Py4j serialization..." should include stacklevel=2 so
the warning points to the user's call site; update the warnings.warn invocation
in zeppelin_isparkconnect.py (the warnings.warn(...) call with that exact
message) to pass stacklevel=2 as an argument (e.g.,
warnings.warn("createDataFrame ... temp table instead.", stacklevel=2)).

68-73: Add stacklevel=2 to warnings for proper caller attribution.

Without explicit stacklevel, warnings will point to this internal line rather than the user's code that triggered the warning.

♻️ Proposed fix
             if row_count > _COLLECT_WARN_THRESHOLD:
                 warnings.warn(
                     "Collecting %d rows to driver. This may cause OOM. "
                     "Consider using .limit() or .toPandas() with a smaller subset."
-                    % row_count)
+                    % row_count,
+                    stacklevel=2)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py` around
lines 68 - 73, The warnings.warn call inside the data-collection branch should
include stacklevel=2 so the warning points to the user's calling code; update
the warnings.warn invocation (the one immediately before return
list(self._jdf.collectAsList())) to pass stacklevel=2 as an argument while
keeping the existing message and interpolation unchanged.
spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreter.java (1)

60-65: Consider opening SparkConnectInterpreter explicitly for robustness.

The open() method resolves the SparkConnectInterpreter but doesn't explicitly open it. While Zeppelin may guarantee ordering, explicitly calling sparkConnectInterpreter.open() would make the dependency clear and prevent issues if initialization order changes.

♻️ Proposed fix
   `@Override`
   public void open() throws InterpreterException {
     this.sparkConnectInterpreter =
         getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
+    sparkConnectInterpreter.open();
     this.sqlSplitter = new SqlSplitter();
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreter.java`
around lines 60 - 65, The open() method currently looks up the
SparkConnectInterpreter instance but doesn't explicitly initialize it; update
SparkConnectSqlInterpreter.open to invoke sparkConnectInterpreter.open() after
obtaining it (and before using sqlSplitter), propagating or handling
InterpreterException as appropriate so the dependent SparkConnectInterpreter is
explicitly opened; reference the sparkConnectInterpreter field and the open()
method on SparkConnectInterpreter when making this change.
spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java (1)

102-109: Redundant null check - the else branch is unreachable.

getProperty("zeppelin.python", "python") always returns at least "python" (the default), so StringUtils.isNotBlank(pythonExec) is always true and the final return "python" is dead code.

♻️ Proposed simplification
   `@Override`
   protected String getPythonExec() {
-    String pythonExec = getProperty("zeppelin.python", "python");
-    if (StringUtils.isNotBlank(pythonExec)) {
-      return pythonExec;
-    }
-    return "python";
+    return getProperty("zeppelin.python", "python");
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 102 - 109, The getPythonExec method contains a redundant non-blank
check because getProperty("zeppelin.python", "python") will always return a
non-null default; simplify by returning the property value directly. Update the
getPythonExec method in PySparkConnectInterpreter to simply return
getProperty("zeppelin.python", "python") (remove the StringUtils.isNotBlank
check and the unreachable final return) so only the direct call to getProperty
remains.
spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java (2)

41-42: Consider resetting opened flag in close() to support interpreter restart.

The opened flag prevents double-open but is never reset in close(), which could prevent the interpreter from being reopened after closing.

♻️ Proposed fix in close()
   `@Override`
   public void close() throws InterpreterException {
     LOGGER.info("Close IPySparkConnectInterpreter");
     super.close();
+    opened = false;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 41 - 42, The opened flag is set to prevent double-open but isn't
reset on shutdown; update the IPySparkConnectInterpreter.close() method to set
opened = false (and clear any state like curIntpContext if appropriate) so the
interpreter can be reopened after close; apply this change inside the close()
implementation of IPySparkConnectInterpreter to mirror the semantics of open()
and ensure proper restart behavior.

122-125: Unchecked cast is documented but lacks runtime safety.

While @SuppressWarnings is present, if a non-Dataset object is passed, this will throw a ClassCastException at runtime. Consider adding a type check or documenting the contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 122 - 125, The method formatDataFrame currently performs an
unchecked cast to (Dataset<Row>) and can throw ClassCastException at runtime;
update formatDataFrame to validate the input before casting (e.g., check that df
instanceof Dataset<?> and that its row type is compatible) and handle invalid
types by throwing a clear IllegalArgumentException or returning a descriptive
error string instead of allowing a ClassCastException; keep the call to
SparkConnectUtils.showDataFrame for valid Dataset<Row> inputs and include the
method name formatDataFrame, the target cast to Dataset<Row>, and the use of
SparkConnectUtils.showDataFrame when locating and modifying the code.
spark-connect/src/main/resources/python/zeppelin_sparkconnect.py (1)

1-310: Consider extracting shared code to reduce duplication.

zeppelin_sparkconnect.py and zeppelin_isparkconnect.py are nearly identical. Consider extracting the common SparkConnectDataFrame and SparkConnectSession classes into a shared module to reduce maintenance burden and ensure consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py` around
lines 1 - 310, The two files duplicate the SparkConnectDataFrame and
SparkConnectSession implementations; extract these classes and any helper
functions/constants they use (e.g., SparkConnectDataFrame, SparkConnectSession,
_rows_to_dicts, _COLLECT_LIMIT_DEFAULT, _COLLECT_WARN_THRESHOLD) into a new
shared module (e.g., zeppelin_sparkconnect_common) and replace the in-file
definitions in both zeppelin_sparkconnect.py and zeppelin_isparkconnect.py with
imports from that module; ensure each file still initializes its
gateway/entry_point/_max_result and passes or sets any module-level state the
shared module relies on, update imports and references (SparkConnectDataFrame,
SparkConnectSession) accordingly, and run tests to confirm behavior is
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java`:
- Around line 51-53: removeNotebookLock unconditionally removes entries from
notebookLocks which can create a new lock while the old one is still held;
change removeNotebookLock to only remove the map entry when the current lock for
noteId is not held and has no queued threads. Locate notebookLocks and the
methods removeNotebookLock and getNotebookLock, obtain the ReentrantLock
instance from notebookLocks (e.g., via get/computeIfPresent), and check
lock.isLocked() and lock.hasQueuedThreads() (or attempt a non-blocking tryLock
and immediately unlock) before calling notebookLocks.remove(noteId) so you only
remove locks that are truly free.

In `@spark-connect/src/main/resources/interpreter-setting.json`:
- Around line 111-117: The "spark.connect.token" interpreter setting is
currently declared with "type": "string" and must be treated as sensitive;
update the "spark.connect.token" entry in interpreter-setting.json to use
"type": "password" (matching other secret fields like jdbc/mongodb entries) so
the token is masked in UI/exports, keeping the same envName, propertyName and
defaultValue.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py`:
- Around line 136-137: The groupBy method currently returns the raw Java
GroupedData from self._jdf, breaking the wrapper pattern; update
SparkConnectDataFrame.groupBy to wrap the result in the corresponding Python
wrapper (e.g., a GroupedData / SparkConnectGroupedData instance) instead of
returning the raw Java object so callers get the same high-level API as other
transformations; locate groupBy and construct/return the proper wrapper (passing
the Java object and any required session/context like self._session) using the
existing GroupedData wrapper class used elsewhere in the module.
- Around line 52-56: The show method currently ignores the truncate parameter;
update the show function (def show) to either remove the unused truncate
argument or forward it to intp.formatDataFrame so truncation is honored—e.g.,
use the truncate value when calling intp.formatDataFrame(self._jdf, effective_n,
truncate) if the Java/Python interop supports a truncate argument, or if not
supported, drop the truncate parameter from the show signature and update any
callers; reference the show method, its truncate parameter, self._jdf and the
intp.formatDataFrame call when making the change.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py`:
- Around line 54-56: The show method currently ignores the truncate parameter;
update the show(self, n=20, truncate=True) implementation to use truncate when
formatting the DataFrame (e.g., pass the truncate flag through to
intp.formatDataFrame or apply equivalent truncation logic before printing) so
that the truncate argument affects output; locate the show method and modify the
call to intp.formatDataFrame(self._jdf, effective_n) to include the truncate
behavior using the truncate variable.
- Around line 141-142: The groupBy method currently returns the raw Java
GroupedData object from self._jdf.groupBy(*cols); change it to return the
wrapped GroupedData wrapper (e.g., construct and return
GroupedData(self._jdf.groupBy(*cols)) or whatever local wrapper class is used)
so the wrapper pattern is preserved; update the groupBy method and add any
necessary import/reference to the wrapper class (GroupedData) used elsewhere in
this module.
- Around line 89-98: The docstring for toPandas incorrectly states that it tries
to use pyarrow; update the toPandas method docstring to accurately describe the
current implementation: remove any mention of pyarrow and state that it converts
rows row-by-row via Py4j with a safety limit (limit argument, default from
zeppelin.spark.maxResult, and limit=-1 for all rows). Keep the Args section and
wording consistent with the actual behavior in toPandas.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/PySparkConnectInterpreterTest.java`:
- Around line 116-117: The OR-based assertions in PySparkConnectInterpreterTest
(the assertTrue checks that use output.contains("id") ||
output.contains("message") || output.contains("hello") and the ones at the other
noted locations) are too permissive — update them to assert specific,
deterministic output by checking all expected tokens together (e.g.,
assertTrue(output.contains("id") && output.contains("message") &&
output.contains("hello")) or better,
assertTrue(output.containsAll(expectedColumnsList)) or assertEquals on the exact
string/JSON you expect), replace the ambiguous SUCCESS/ERROR acceptance with an
assertEquals to the single expected status value, and for the delta-case create
or mock deterministic test data so the expected columns/values are known; locate
and change the assertions in PySparkConnectInterpreterTest (the OR-based
assertions and the SUCCESS/ERROR branch) and adjust test setup for the delta
path to produce deterministic output.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java`:
- Around line 72-77: The teardown only closes the Interpreter instance; also
ensure the InterpreterGroup is cleaned up to avoid lingering resources by
updating the tearDown method to check if the shared InterpreterGroup
(InterpreterGroup variable) is non-null and invoke its close/cleanup method
after closing the interpreter (e.g., if (interpreterGroup != null) {
interpreterGroup.close(); }), making sure to reference the existing interpreter
and InterpreterGroup symbols and handle exceptions similar to
interpreter.close().

---

Nitpick comments:
In `@spark-connect/pom.xml`:
- Around line 37-38: There are two declarations of the Maven property
spark.connect.version (one global and one inside the default-active profile)
causing duplication and drift; remove the duplicate inside the profile (or
remove the global one if you intend the profile to be canonical) so only one
spark.connect.version property remains, and update/remove the matching duplicate
at the other spot referenced (the other occurrence around the default-active
profile) to ensure a single source of truth for Spark Connect version.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`:
- Around line 41-42: The opened flag is set to prevent double-open but isn't
reset on shutdown; update the IPySparkConnectInterpreter.close() method to set
opened = false (and clear any state like curIntpContext if appropriate) so the
interpreter can be reopened after close; apply this change inside the close()
implementation of IPySparkConnectInterpreter to mirror the semantics of open()
and ensure proper restart behavior.
- Around line 122-125: The method formatDataFrame currently performs an
unchecked cast to (Dataset<Row>) and can throw ClassCastException at runtime;
update formatDataFrame to validate the input before casting (e.g., check that df
instanceof Dataset<?> and that its row type is compatible) and handle invalid
types by throwing a clear IllegalArgumentException or returning a descriptive
error string instead of allowing a ClassCastException; keep the call to
SparkConnectUtils.showDataFrame for valid Dataset<Row> inputs and include the
method name formatDataFrame, the target cast to Dataset<Row>, and the use of
SparkConnectUtils.showDataFrame when locating and modifying the code.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java`:
- Around line 41-44: Extract the fairness literal from the ReentrantLock
instantiation into a named private static final boolean (e.g., FAIR_LOCK = true)
and replace the inline literal in getNotebookLock (and any other uses) with that
constant; also harden the utility by preventing instantiation of
NotebookLockManager (add a private constructor) and ensure notebookLocks and
getNotebookLock remain static so the class stays a proper non-instantiable
utility holder.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`:
- Around line 102-109: The getPythonExec method contains a redundant non-blank
check because getProperty("zeppelin.python", "python") will always return a
non-null default; simplify by returning the property value directly. Update the
getPythonExec method in PySparkConnectInterpreter to simply return
getProperty("zeppelin.python", "python") (remove the StringUtils.isNotBlank
check and the unreachable final return) so only the direct call to getProperty
remains.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreter.java`:
- Around line 60-65: The open() method currently looks up the
SparkConnectInterpreter instance but doesn't explicitly initialize it; update
SparkConnectSqlInterpreter.open to invoke sparkConnectInterpreter.open() after
obtaining it (and before using sqlSplitter), propagating or handling
InterpreterException as appropriate so the dependent SparkConnectInterpreter is
explicitly opened; reference the sparkConnectInterpreter field and the open()
method on SparkConnectInterpreter when making this change.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py`:
- Around line 94-99: In the except ImportError block that handles "import pandas
as pd" (used by toPandas()), re-raise the ImportError using exception chaining
with "from None" so the new ImportError intentionally replaces the original
(i.e., raise ImportError("pandas is required for toPandas(). Install it with:
pip install pandas") from None).
- Around line 264-266: The warnings.warn call that alerts users about
"createDataFrame from pandas goes through Py4j serialization..." should include
stacklevel=2 so the warning points to the user's call site; update the
warnings.warn invocation in zeppelin_isparkconnect.py (the warnings.warn(...)
call with that exact message) to pass stacklevel=2 as an argument (e.g.,
warnings.warn("createDataFrame ... temp table instead.", stacklevel=2)).
- Around line 68-73: The warnings.warn call inside the data-collection branch
should include stacklevel=2 so the warning points to the user's calling code;
update the warnings.warn invocation (the one immediately before return
list(self._jdf.collectAsList())) to pass stacklevel=2 as an argument while
keeping the existing message and interpolation unchanged.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py`:
- Around line 1-310: The two files duplicate the SparkConnectDataFrame and
SparkConnectSession implementations; extract these classes and any helper
functions/constants they use (e.g., SparkConnectDataFrame, SparkConnectSession,
_rows_to_dicts, _COLLECT_LIMIT_DEFAULT, _COLLECT_WARN_THRESHOLD) into a new
shared module (e.g., zeppelin_sparkconnect_common) and replace the in-file
definitions in both zeppelin_sparkconnect.py and zeppelin_isparkconnect.py with
imports from that module; ensure each file still initializes its
gateway/entry_point/_max_result and passes or sets any module-level state the
shared module relies on, update imports and references (SparkConnectDataFrame,
SparkConnectSession) accordingly, and run tests to confirm behavior is
unchanged.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java`:
- Around line 54-89: The test harness setup/teardown and context builder in
SparkConnectInterpreterTest (setUp, tearDown, and getInterpreterContext) are
duplicated across tests—extract them into a shared test base or helper class:
create a common abstract base (e.g., SparkConnectTestBase) or a TestUtils helper
that provides the static setup/cleanup methods (initializing InterpreterGroup,
SparkConnectInterpreter, calling interpreter.open/close) and a reusable
getInterpreterContext builder that returns the configured InterpreterContext
(using AngularObjectRegistry, LocalResourcePool, InterpreterOutput, mocked
RemoteInterpreterEventClient); then update SparkConnectInterpreterTest to extend
or call that shared base/helper and remove the duplicated code from the test
class.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e5d0b9 and 95c600b.

📒 Files selected for processing (15)
  • pom.xml
  • spark-connect/pom.xml
  • spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java
  • spark-connect/src/main/resources/interpreter-setting.json
  • spark-connect/src/main/resources/python/zeppelin_isparkconnect.py
  • spark-connect/src/main/resources/python/zeppelin_sparkconnect.py
  • spark-connect/src/test/java/org/apache/zeppelin/spark/PySparkConnectInterpreterTest.java
  • spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java
  • spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreterTest.java
  • spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectUtilsTest.java

Comment on lines +51 to +53
public static void removeNotebookLock(String noteId) {
notebookLocks.remove(noteId);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid removing notebook locks while they may still be active.

Line 52 removes the lock unconditionally. If a thread still holds the old lock and another thread calls getNotebookLock, a new lock can be created for the same notebook, allowing concurrent execution.

🧵 Safer cleanup approach
   public static void removeNotebookLock(String noteId) {
-    notebookLocks.remove(noteId);
+    notebookLocks.computeIfPresent(noteId, (id, lock) -> {
+      if (lock.isLocked() || lock.hasQueuedThreads()) {
+        return lock;
+      }
+      return null; // remove only when truly idle
+    });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static void removeNotebookLock(String noteId) {
notebookLocks.remove(noteId);
}
public static void removeNotebookLock(String noteId) {
notebookLocks.computeIfPresent(noteId, (id, lock) -> {
if (lock.isLocked() || lock.hasQueuedThreads()) {
return lock;
}
return null; // remove only when truly idle
});
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/NotebookLockManager.java`
around lines 51 - 53, removeNotebookLock unconditionally removes entries from
notebookLocks which can create a new lock while the old one is still held;
change removeNotebookLock to only remove the map entry when the current lock for
noteId is not held and has no queued threads. Locate notebookLocks and the
methods removeNotebookLock and getNotebookLock, obtain the ReentrantLock
instance from notebookLocks (e.g., via get/computeIfPresent), and check
lock.isLocked() and lock.hasQueuedThreads() (or attempt a non-blocking tryLock
and immediately unlock) before calling notebookLocks.remove(noteId) so you only
remove locks that are truly free.

Comment on lines +111 to +117
"spark.connect.token": {
"envName": "SPARK_CONNECT_TOKEN",
"propertyName": "spark.connect.token",
"defaultValue": "",
"description": "Authentication token for Spark Connect (optional)",
"type": "string"
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, locate and verify the target file exists
find . -name "interpreter-setting.json" -type f

Repository: Meesho/zeppelin

Length of output: 2075


🏁 Script executed:

# Run the provided verification script to check for password-type usage
rg -n '"type"\s*:\s*"password"' --glob '**/interpreter-setting.json'

Repository: Meesho/zeppelin

Length of output: 559


🏁 Script executed:

# Examine the structure of the spark-connect interpreter-setting.json file around lines 111-117
if [ -f "spark-connect/src/main/resources/interpreter-setting.json" ]; then
  cat -n "spark-connect/src/main/resources/interpreter-setting.json" | sed -n '105,125p'
else
  echo "File not found at expected path"
fi

Repository: Meesho/zeppelin

Length of output: 1010


🏁 Script executed:

# Search for other authentication token fields to understand naming patterns
rg -i '"token"' --glob '**/interpreter-setting.json' -A 3

Repository: Meesho/zeppelin

Length of output: 41


Mark spark.connect.token as a secret field.

Line 116 uses "type": "string" for an authentication token. Authentication tokens should be masked as sensitive values in interpreter settings. The repository already uses "type": "password" for other authentication fields (e.g., in jdbc, mongodb, elasticsearch, cassandra, and influxdb interpreters).

🔐 Suggested change
       "spark.connect.token": {
         "envName": "SPARK_CONNECT_TOKEN",
         "propertyName": "spark.connect.token",
         "defaultValue": "",
         "description": "Authentication token for Spark Connect (optional)",
-        "type": "string"
+        "type": "password"
       },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"spark.connect.token": {
"envName": "SPARK_CONNECT_TOKEN",
"propertyName": "spark.connect.token",
"defaultValue": "",
"description": "Authentication token for Spark Connect (optional)",
"type": "string"
},
"spark.connect.token": {
"envName": "SPARK_CONNECT_TOKEN",
"propertyName": "spark.connect.token",
"defaultValue": "",
"description": "Authentication token for Spark Connect (optional)",
"type": "password"
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/interpreter-setting.json` around lines 111 -
117, The "spark.connect.token" interpreter setting is currently declared with
"type": "string" and must be treated as sensitive; update the
"spark.connect.token" entry in interpreter-setting.json to use "type":
"password" (matching other secret fields like jdbc/mongodb entries) so the token
is masked in UI/exports, keeping the same envName, propertyName and
defaultValue.

Comment on lines +52 to +56
def show(self, n=20, truncate=True):
effective_n = min(n, _max_result)
print(intp.formatDataFrame(self._jdf, effective_n))

def collect(self, limit=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused truncate parameter in show() method.

The truncate argument is accepted but never used. This could mislead users expecting truncation control.

🔧 Proposed fix

Either remove the unused parameter or pass it to formatDataFrame:

-    def show(self, n=20, truncate=True):
-        effective_n = min(n, _max_result)
-        print(intp.formatDataFrame(self._jdf, effective_n))
+    def show(self, n=20, truncate=True):
+        """Show DataFrame rows. Note: truncate parameter is currently not supported."""
+        effective_n = min(n, _max_result)
+        print(intp.formatDataFrame(self._jdf, effective_n))

Alternatively, if the Java side supports truncation, wire it through.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 52-52: Unused method argument: truncate

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py` around
lines 52 - 56, The show method currently ignores the truncate parameter; update
the show function (def show) to either remove the unused truncate argument or
forward it to intp.formatDataFrame so truncation is honored—e.g., use the
truncate value when calling intp.formatDataFrame(self._jdf, effective_n,
truncate) if the Java/Python interop supports a truncate argument, or if not
supported, drop the truncate parameter from the show signature and update any
callers; reference the show method, its truncate parameter, self._jdf and the
intp.formatDataFrame call when making the change.

Comment on lines +136 to +137
def groupBy(self, *cols):
return self._jdf.groupBy(*cols)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

groupBy returns raw Java object instead of wrapped DataFrame.

Unlike other transformation methods that return SparkConnectDataFrame, groupBy returns the raw Java GroupedData object, breaking the wrapper pattern and exposing internals.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_isparkconnect.py` around
lines 136 - 137, The groupBy method currently returns the raw Java GroupedData
from self._jdf, breaking the wrapper pattern; update
SparkConnectDataFrame.groupBy to wrap the result in the corresponding Python
wrapper (e.g., a GroupedData / SparkConnectGroupedData instance) instead of
returning the raw Java object so callers get the same high-level API as other
transformations; locate groupBy and construct/return the proper wrapper (passing
the Java object and any required session/context like self._session) using the
existing GroupedData wrapper class used elsewhere in the module.

Comment on lines +54 to +56
def show(self, n=20, truncate=True):
effective_n = min(n, _max_result)
print(intp.formatDataFrame(self._jdf, effective_n))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused truncate parameter - same issue as in zeppelin_isparkconnect.py.

The truncate argument is accepted but not used, which could mislead users.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 54-54: Unused method argument: truncate

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py` around
lines 54 - 56, The show method currently ignores the truncate parameter; update
the show(self, n=20, truncate=True) implementation to use truncate when
formatting the DataFrame (e.g., pass the truncate flag through to
intp.formatDataFrame or apply equivalent truncation logic before printing) so
that the truncate argument affects output; locate the show method and modify the
call to intp.formatDataFrame(self._jdf, effective_n) to include the truncate
behavior using the truncate variable.

Comment on lines +89 to +98
def toPandas(self, limit=None):
"""Convert to pandas DataFrame. Applies a safety limit.

Tries to use pyarrow for efficient serialization if available,
otherwise falls back to row-by-row conversion through Py4j.

Args:
limit: Max rows. Defaults to zeppelin.spark.maxResult.
Pass limit=-1 for all rows (use with caution on large data).
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading docstring: mentions pyarrow but doesn't use it.

The docstring claims "Tries to use pyarrow for efficient serialization if available" but the implementation doesn't check for or use pyarrow - it always uses row-by-row conversion through Py4j.

📝 Proposed fix - update docstring to match implementation
     def toPandas(self, limit=None):
         """Convert to pandas DataFrame. Applies a safety limit.
 
-        Tries to use pyarrow for efficient serialization if available,
-        otherwise falls back to row-by-row conversion through Py4j.
+        Uses row-by-row conversion through Py4j.
 
         Args:
             limit: Max rows. Defaults to zeppelin.spark.maxResult.
                    Pass limit=-1 for all rows (use with caution on large data).
         """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def toPandas(self, limit=None):
"""Convert to pandas DataFrame. Applies a safety limit.
Tries to use pyarrow for efficient serialization if available,
otherwise falls back to row-by-row conversion through Py4j.
Args:
limit: Max rows. Defaults to zeppelin.spark.maxResult.
Pass limit=-1 for all rows (use with caution on large data).
"""
def toPandas(self, limit=None):
"""Convert to pandas DataFrame. Applies a safety limit.
Uses row-by-row conversion through Py4j.
Args:
limit: Max rows. Defaults to zeppelin.spark.maxResult.
Pass limit=-1 for all rows (use with caution on large data).
"""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py` around
lines 89 - 98, The docstring for toPandas incorrectly states that it tries to
use pyarrow; update the toPandas method docstring to accurately describe the
current implementation: remove any mention of pyarrow and state that it converts
rows row-by-row via Py4j with a safety limit (limit argument, default from
zeppelin.spark.maxResult, and limit=-1 for all rows). Keep the Args section and
wording consistent with the actual behavior in toPandas.

Comment on lines +141 to +142
def groupBy(self, *cols):
return self._jdf.groupBy(*cols)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

groupBy returns raw Java object instead of wrapped DataFrame.

Same issue as in zeppelin_isparkconnect.py - breaks the wrapper pattern by returning unwrapped Java GroupedData.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/python/zeppelin_sparkconnect.py` around
lines 141 - 142, The groupBy method currently returns the raw Java GroupedData
object from self._jdf.groupBy(*cols); change it to return the wrapped
GroupedData wrapper (e.g., construct and return
GroupedData(self._jdf.groupBy(*cols)) or whatever local wrapper class is used)
so the wrapper pattern is preserved; update the groupBy method and add any
necessary import/reference to the wrapper class (GroupedData) used elsewhere in
this module.

Comment on lines +116 to +117
assertTrue(output.contains("id") || output.contains("message") || output.contains("hello"),
"Output should contain query results: " + output);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Tighten assertions—current checks are too permissive to catch regressions reliably.

The OR-based output checks (Lines 116-117, 127-128, 151-152) can pass on partial/incorrect outputs, and Line 139-141 accepts both SUCCESS and ERROR. Consider making these assertions deterministic (e.g., assert expected columns/values together, or set up deterministic test data for the delta-case path).

Also applies to: 127-128, 139-141, 151-152

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/PySparkConnectInterpreterTest.java`
around lines 116 - 117, The OR-based assertions in PySparkConnectInterpreterTest
(the assertTrue checks that use output.contains("id") ||
output.contains("message") || output.contains("hello") and the ones at the other
noted locations) are too permissive — update them to assert specific,
deterministic output by checking all expected tokens together (e.g.,
assertTrue(output.contains("id") && output.contains("message") &&
output.contains("hello")) or better,
assertTrue(output.containsAll(expectedColumnsList)) or assertEquals on the exact
string/JSON you expect), replace the ambiguous SUCCESS/ERROR acceptance with an
assertEquals to the single expected status value, and for the delta-case create
or mock deterministic test data so the expected columns/values are known; locate
and change the assertions in PySparkConnectInterpreterTest (the OR-based
assertions and the SUCCESS/ERROR branch) and adjust test setup for the delta
path to produce deterministic output.

Comment on lines +72 to +77
@AfterAll
public static void tearDown() throws InterpreterException {
if (interpreter != null) {
interpreter.close();
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Close the InterpreterGroup in teardown as well.

InterpreterGroup owns session/lifecycle state. Line 73 currently only closes the interpreter instance; group cleanup should also run to avoid lingering resources in integration test runs.

🧹 Suggested teardown update
   `@AfterAll`
   public static void tearDown() throws InterpreterException {
-    if (interpreter != null) {
-      interpreter.close();
-    }
+    if (intpGroup != null) {
+      intpGroup.close();
+    } else if (interpreter != null) {
+      interpreter.close();
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectInterpreterTest.java`
around lines 72 - 77, The teardown only closes the Interpreter instance; also
ensure the InterpreterGroup is cleaned up to avoid lingering resources by
updating the tearDown method to check if the shared InterpreterGroup
(InterpreterGroup variable) is non-null and invoke its close/cleanup method
after closing the interpreter (e.g., if (interpreterGroup != null) {
interpreterGroup.close(); }), making sure to reference the existing interpreter
and InterpreterGroup symbols and handle exceptions similar to
interpreter.close().

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (1)
spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java (1)

189-190: Add lock cleanup to prevent unbounded lock map growth.

The lock acquire/release pattern is correct, but lock objects accumulate indefinitely. NotebookLockManager.getNotebookLock(noteId) creates entries in a static ConcurrentHashMap that are never removed via removeNotebookLock(noteId). In long-running systems with many notebooks, this causes unbounded memory growth. The close() method should clean up the lock for its notebook.

The same issue exists in SparkConnectSqlInterpreter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
around lines 189 - 190, Add cleanup of the per-notebook lock in the interpreter
shutdown path: in SparkConnectInterpreter.close() (and mirror the same change in
SparkConnectSqlInterpreter.close()), ensure you release the held ReentrantLock
(notebookLock.unlock()) in a try/finally around any locked work, and then call
NotebookLockManager.removeNotebookLock(noteId) to remove the entry from the
static ConcurrentHashMap so locks do not accumulate; ensure removal happens
after unlocking and only for the notebook's lock instance.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`:
- Around line 142-147: The getMaxResult method in PySparkConnectInterpreter
currently calls Integer.parseInt(getProperty("zeppelin.spark.maxResult",
"1000")) which will throw NumberFormatException for non-numeric values; change
getMaxResult to defensively parse the property inside a try-catch (catch
NumberFormatException) and return the default 1000 when parsing fails or the
property is malformed, and optionally log a warning via the class logger to
indicate the fallback; keep the existing branch that defers to
sparkConnectInterpreter.getMaxResult() when sparkConnectInterpreter != null.
- Around line 111-117: The resolvePythonBinary method currently calls
p.waitFor() which can block indefinitely; change it to use p.waitFor(timeout,
TimeUnit.SECONDS) with a sensible short timeout (e.g., a few seconds) and, if it
returns false, call p.destroyForcibly() and continue to the next candidate;
ensure you handle InterruptedException and IOException the same way you do now
and close any streams from the Process to avoid resource leaks when using
Process p in resolvePythonBinary.
- Around line 55-69:
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class) can
return null so add a null check after that call and throw an
InterpreterException (with LOGGER.error) if null instead of calling
sparkConnectInterpreter.open(); additionally protect against partial opens by
tracking which components succeeded (sparkConnectInterpreter.open() and
super.open()) and on any subsequent failure (e.g. bootstrapInterpreter throws)
close/cleanup any interpreter that was opened before rethrowing the
InterpreterException; update error messages to include context (method names)
and reuse LOGGER for logging.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`:
- Around line 153-166: The close() method in SparkConnectInterpreter must be
made thread-safe to avoid double release of session slots: synchronize access to
the sessionSlotAcquired flag and the call to releaseSessionSlot so that only one
thread can observe-and-clear the flag and perform release; e.g., add a
synchronized method or a synchronized(this) block around the section that checks
sessionSlotAcquired, calls releaseSessionSlot(currentUser) and sets
sessionSlotAcquired = false, while leaving sparkSession.close() handling as-is
(or also inside the same synchronization if preferred) to ensure release happens
exactly once.
- Around line 193-194: The parsing of the "limit" property in
SparkConnectInterpreter (the line using Integer.parseInt on
context.getLocalProperties().getOrDefault) can throw NumberFormatException and
bypass the SQL-error handling; modify this by validating/parsing the value
defensively: wrap the parse in a try/catch (catch NumberFormatException), reject
non-positive or malformed values (e.g., NaN, negative, zero) by falling back to
maxResult or clamping within a safe range, and ensure any parse error is handled
and logged consistently so it flows through the same SQL-error path as other
runtime errors; update references to context.getLocalProperties(), the 'limit'
variable and 'maxResult' accordingly.

---

Nitpick comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`:
- Around line 189-190: Add cleanup of the per-notebook lock in the interpreter
shutdown path: in SparkConnectInterpreter.close() (and mirror the same change in
SparkConnectSqlInterpreter.close()), ensure you release the held ReentrantLock
(notebookLock.unlock()) in a try/finally around any locked work, and then call
NotebookLockManager.removeNotebookLock(noteId) to remove the entry from the
static ConcurrentHashMap so locks do not accumulate; ensure removal happens
after unlocking and only for the notebook's lock instance.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c6eae214-23e0-4acf-9613-6a1d0d2b91bd

📥 Commits

Reviewing files that changed from the base of the PR and between 95c600b and 0e79887.

📒 Files selected for processing (3)
  • spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java
  • spark-connect/src/main/resources/interpreter-setting.json
✅ Files skipped from review due to trivial changes (1)
  • spark-connect/src/main/resources/interpreter-setting.json

Comment on lines +55 to +69
this.sparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);

// Ensure the Java SparkSession is ready before starting Python
sparkConnectInterpreter.open();

super.open();

if (!useIPython()) {
try {
bootstrapInterpreter("python/zeppelin_sparkconnect.py");
} catch (IOException e) {
LOGGER.error("Fail to bootstrap spark connect", e);
throw new InterpreterException("Fail to bootstrap spark connect", e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "PySparkConnectInterpreter.java" | head -5

Repository: Meesho/zeppelin

Length of output: 145


🏁 Script executed:

# Get the complete file to understand the full context
cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java

Repository: Meesho/zeppelin

Length of output: 6112


🏁 Script executed:

# Find the getInterpreterInTheSameSessionByClassName method
rg -n "getInterpreterInTheSameSessionByClassName" --type java -B 3 -A 10

Repository: Meesho/zeppelin

Length of output: 50371


🏁 Script executed:

# Check for any existing error handling or try-catch blocks in the open method
rg -n "public void open\(\)" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java -A 30

Repository: Meesho/zeppelin

Length of output: 1121


🏁 Script executed:

# Check PythonInterpreter.close() to understand if it's safe to call
rg -n "class PythonInterpreter" python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java -A 5 | head -20

Repository: Meesho/zeppelin

Length of output: 364


🏁 Script executed:

# Check if there are other examples of try-catch with cleanup in open methods
rg -n "public void open" spark-connect/src/main/java -A 20 | grep -A 20 "catch.*close"

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

# Look for how IPySparkConnectInterpreter handles the open with similar dependencies
cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java | sed -n '40,70p'

Repository: Meesho/zeppelin

Length of output: 1276


Add null guard and cleanup on open failure.

getInterpreterInTheSameSessionByClassName() can return null (line 56), causing NPE when calling sparkConnectInterpreter.open() at line 59. Additionally, if sparkConnectInterpreter.open() or super.open() succeeds but a later operation fails, the interpreter is left partially open with no cleanup.

Suggested fix
   this.sparkConnectInterpreter =
       getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
+  if (this.sparkConnectInterpreter == null) {
+    throw new InterpreterException("SparkConnectInterpreter is required in the same session");
+  }

-  // Ensure the Java SparkSession is ready before starting Python
-  sparkConnectInterpreter.open();
-
-  super.open();
-
-  if (!useIPython()) {
-    try {
-      bootstrapInterpreter("python/zeppelin_sparkconnect.py");
-    } catch (IOException e) {
-      LOGGER.error("Fail to bootstrap spark connect", e);
-      throw new InterpreterException("Fail to bootstrap spark connect", e);
-    }
-  }
+  try {
+    // Ensure the Java SparkSession is ready before starting Python
+    sparkConnectInterpreter.open();
+    super.open();
+    if (!useIPython()) {
+      bootstrapInterpreter("python/zeppelin_sparkconnect.py");
+    }
+  } catch (IOException e) {
+    try {
+      super.close();
+    } catch (InterpreterException closeEx) {
+      LOGGER.warn("Failed to close python interpreter after bootstrap failure", closeEx);
+    }
+    LOGGER.error("Fail to bootstrap spark connect", e);
+    throw new InterpreterException("Fail to bootstrap spark connect", e);
+  } catch (InterpreterException e) {
+    try {
+      super.close();
+    } catch (InterpreterException closeEx) {
+      LOGGER.warn("Failed to close python interpreter after open failure", closeEx);
+    }
+    throw e;
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.sparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
// Ensure the Java SparkSession is ready before starting Python
sparkConnectInterpreter.open();
super.open();
if (!useIPython()) {
try {
bootstrapInterpreter("python/zeppelin_sparkconnect.py");
} catch (IOException e) {
LOGGER.error("Fail to bootstrap spark connect", e);
throw new InterpreterException("Fail to bootstrap spark connect", e);
}
this.sparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
if (this.sparkConnectInterpreter == null) {
throw new InterpreterException("SparkConnectInterpreter is required in the same session");
}
try {
// Ensure the Java SparkSession is ready before starting Python
sparkConnectInterpreter.open();
super.open();
if (!useIPython()) {
bootstrapInterpreter("python/zeppelin_sparkconnect.py");
}
} catch (IOException e) {
try {
super.close();
} catch (InterpreterException closeEx) {
LOGGER.warn("Failed to close python interpreter after bootstrap failure", closeEx);
}
LOGGER.error("Fail to bootstrap spark connect", e);
throw new InterpreterException("Fail to bootstrap spark connect", e);
} catch (InterpreterException e) {
try {
super.close();
} catch (InterpreterException closeEx) {
LOGGER.warn("Failed to close python interpreter after open failure", closeEx);
}
throw e;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 55 - 69,
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class) can
return null so add a null check after that call and throw an
InterpreterException (with LOGGER.error) if null instead of calling
sparkConnectInterpreter.open(); additionally protect against partial opens by
tracking which components succeeded (sparkConnectInterpreter.open() and
super.open()) and on any subsequent failure (e.g. bootstrapInterpreter throws)
close/cleanup any interpreter that was opened before rethrowing the
InterpreterException; update error messages to include context (method names)
and reuse LOGGER for logging.

Comment on lines +86 to +100
public org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
curIntpContext = context;
return super.interpret(st, context);
}

@Override
protected void preCallPython(InterpreterContext context) {
callPython(new PythonInterpretRequest(
"intp.setInterpreterContextInPython()", false, false));
}

public void setInterpreterContextInPython() {
InterpreterContext.set(curIntpContext);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

git ls-files | grep -i "PySparkConnectInterpreter"

Repository: Meesho/zeppelin

Length of output: 318


🏁 Script executed:

cat spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | wc -l

Repository: Meesho/zeppelin

Length of output: 62


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | head -150

Repository: Meesho/zeppelin

Length of output: 6011


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | grep -A 5 -B 5 "curIntpContext"

Repository: Meesho/zeppelin

Length of output: 1306


🏁 Script executed:

find . -name "PythonInterpreter.java" | head -5

Repository: Meesho/zeppelin

Length of output: 131


🏁 Script executed:

find . -name "Interpreter.java" | grep -v ".class" | head -5

Repository: Meesho/zeppelin

Length of output: 144


🏁 Script executed:

cat spark-connect/src/test/java/org/apache/zeppelin/spark/PySparkConnectInterpreterTest.java

Repository: Meesho/zeppelin

Length of output: 6686


🏁 Script executed:

cat -n ./python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | head -200

Repository: Meesho/zeppelin

Length of output: 9989


🏁 Script executed:

grep -n "protected void preCallPython\|public InterpreterResult interpret" ./python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | head -20

Repository: Meesho/zeppelin

Length of output: 203


🏁 Script executed:

cat -n ./zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java | head -200

Repository: Meesho/zeppelin

Length of output: 7832


🏁 Script executed:

cat -n ./python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | sed -n '351,410p'

Repository: Meesho/zeppelin

Length of output: 2349


🏁 Script executed:

cat -n ./python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | sed -n '375,430p'

Repository: Meesho/zeppelin

Length of output: 2115


🏁 Script executed:

grep -n "getScheduler" ./python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

grep -n "getScheduler" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

grep -rn "createOrGetParallelScheduler\|Parallel" --include="*.java" | grep -i "getScheduler\|PySparkConnect" | head -20

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

rg "interpret\(.*context\)" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java -B 5 -A 10

Repository: Meesho/zeppelin

Length of output: 568


🏁 Script executed:

rg "setInterpreterContextInPython" --include="*.java" -B 3 -A 3

Repository: Meesho/zeppelin

Length of output: 500


🏁 Script executed:

rg "setInterpreterContextInPython" -B 3 -A 3

Repository: Meesho/zeppelin

Length of output: 6315


🏁 Script executed:

rg "class.*Interpreter.*getScheduler" spark-connect/src/main/java/org/apache/zeppelin/spark/ -A 5

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

cat -n ./zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java | sed -n '130,150p'

Repository: Meesho/zeppelin

Length of output: 1116


🏁 Script executed:

rg "getScheduler" spark-connect/src/main/java/org/apache/zeppelin/spark/ -A 3

Repository: Meesho/zeppelin

Length of output: 1104


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java | head -60

Repository: Meesho/zeppelin

Length of output: 2685


🏁 Script executed:

rg "curIntpContext" spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java -B 2 -A 2 | head -40

Repository: Meesho/zeppelin

Length of output: 669


🏁 Script executed:

grep -rn "createOrGetParallelScheduler" spark-connect/src/main/java/org/apache/zeppelin/spark/

Repository: Meesho/zeppelin

Length of output: 220


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | sed -n '38,50p'

Repository: Meesho/zeppelin

Length of output: 548


🏁 Script executed:

rg "InterpreterContext.set" python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java -B 3 -A 3

Repository: Meesho/zeppelin

Length of output: 289


🏁 Script executed:

rg "getScheduler" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java

Repository: Meesho/zeppelin

Length of output: 41


🏁 Script executed:

grep -rn "ParallelScheduler\|concurrentSQL\|pyspark.connect" spark-connect/src/main/java/org/apache/zeppelin/spark/

Repository: Meesho/zeppelin

Length of output: 1119


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectSqlInterpreter.java | sed -n '180,200p'

Repository: Meesho/zeppelin

Length of output: 898


Shared curIntpContext field creates race condition across concurrent interpret() calls.

curIntpContext is per-call state but stored in an unsynchronized instance field. Although PySparkConnectInterpreter uses the default FIFO scheduler, nothing prevents concurrent threads from calling interpret() directly, allowing one thread to overwrite curIntpContext before another thread's asynchronous Python callback executes, binding the wrong InterpreterContext.

Suggested fix
-  private InterpreterContext curIntpContext;
+  private volatile InterpreterContext curIntpContext;
@@
-  public org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
+  public synchronized org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
       InterpreterContext context) throws InterpreterException {
     curIntpContext = context;
-    return super.interpret(st, context);
+    try {
+      return super.interpret(st, context);
+    } finally {
+      curIntpContext = null;
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
curIntpContext = context;
return super.interpret(st, context);
}
@Override
protected void preCallPython(InterpreterContext context) {
callPython(new PythonInterpretRequest(
"intp.setInterpreterContextInPython()", false, false));
}
public void setInterpreterContextInPython() {
InterpreterContext.set(curIntpContext);
}
public synchronized org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
curIntpContext = context;
try {
return super.interpret(st, context);
} finally {
curIntpContext = null;
}
}
`@Override`
protected void preCallPython(InterpreterContext context) {
callPython(new PythonInterpretRequest(
"intp.setInterpreterContextInPython()", false, false));
}
public void setInterpreterContextInPython() {
InterpreterContext.set(curIntpContext);
}

Comment on lines +111 to +117
private String resolvePythonBinary() {
for (String candidate : new String[]{"python3", "python"}) {
try {
Process p = new ProcessBuilder(candidate, "--version")
.redirectErrorStream(true).start();
int exit = p.waitFor();
if (exit == 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

git ls-files | grep -i "pyspark.*interpreter"

Repository: Meesho/zeppelin

Length of output: 973


🏁 Script executed:

cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | head -150

Repository: Meesho/zeppelin

Length of output: 6011


🏁 Script executed:

rg "waitFor" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java -B 3 -A 3

Repository: Meesho/zeppelin

Length of output: 330


🏁 Script executed:

rg "import.*TimeUnit|import java.util.concurrent" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java

Repository: Meesho/zeppelin

Length of output: 41


Add a timeout when probing Python binaries.

waitFor() without timeout at line 116 can block interpreter startup indefinitely if the child process hangs. Replace with waitFor(long, TimeUnit) and forcibly destroy the process if it exceeds the timeout.

Suggested fix
+import java.util.concurrent.TimeUnit;
@@
-        int exit = p.waitFor();
-        if (exit == 0) {
+        boolean finished = p.waitFor(3, TimeUnit.SECONDS);
+        if (!finished) {
+          p.destroyForcibly();
+          continue;
+        }
+        if (p.exitValue() == 0) {
           LOGGER.info("Resolved Python binary: {}", candidate);
           return candidate;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private String resolvePythonBinary() {
for (String candidate : new String[]{"python3", "python"}) {
try {
Process p = new ProcessBuilder(candidate, "--version")
.redirectErrorStream(true).start();
int exit = p.waitFor();
if (exit == 0) {
private String resolvePythonBinary() {
for (String candidate : new String[]{"python3", "python"}) {
try {
Process p = new ProcessBuilder(candidate, "--version")
.redirectErrorStream(true).start();
boolean finished = p.waitFor(3, TimeUnit.SECONDS);
if (!finished) {
p.destroyForcibly();
continue;
}
if (p.exitValue() == 0) {
LOGGER.info("Resolved Python binary: {}", candidate);
return candidate;
}
}
}
return null;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 111 - 117, The resolvePythonBinary method currently calls
p.waitFor() which can block indefinitely; change it to use p.waitFor(timeout,
TimeUnit.SECONDS) with a sensible short timeout (e.g., a few seconds) and, if it
returns false, call p.destroyForcibly() and continue to the next candidate;
ensure you handle InterruptedException and IOException the same way you do now
and close any streams from the Process to avoid resource leaks when using
Process p in resolvePythonBinary.

Comment on lines +142 to +147
public int getMaxResult() {
if (sparkConnectInterpreter != null) {
return sparkConnectInterpreter.getMaxResult();
}
return Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find and examine the specific file
find . -type f -name "PySparkConnectInterpreter.java" | head -5

Repository: Meesho/zeppelin

Length of output: 145


🏁 Script executed:

# Once file is located, check the method in context
rg -A 10 "public int getMaxResult" --type java

Repository: Meesho/zeppelin

Length of output: 5252


🏁 Script executed:

# Check how getProperty is implemented
rg -B 3 -A 5 "getProperty\(" --type java | head -50

Repository: Meesho/zeppelin

Length of output: 6136


🏁 Script executed:

# Look for similar defensive patterns in the codebase
rg "Integer\.parseInt.*catch|NumberFormatException" --type java | head -20

Repository: Meesho/zeppelin

Length of output: 2067


🏁 Script executed:

# Check if there's any property validation elsewhere
rg "zeppelin\.spark\.maxResult" --type java

Repository: Meesho/zeppelin

Length of output: 3225


Add defensive handling for malformed zeppelin.spark.maxResult property.

Integer.parseInt() throws NumberFormatException when the property value is non-numeric. A try-catch with fallback to the default value is needed, consistent with defensive patterns used elsewhere in the codebase (e.g., ZeppelinConfiguration, ElasticsearchInterpreter).

Suggested fix
   public int getMaxResult() {
     if (sparkConnectInterpreter != null) {
       return sparkConnectInterpreter.getMaxResult();
     }
-    return Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
+    try {
+      return Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Invalid zeppelin.spark.maxResult; fallback to 1000", e);
+      return 1000;
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 142 - 147, The getMaxResult method in PySparkConnectInterpreter
currently calls Integer.parseInt(getProperty("zeppelin.spark.maxResult",
"1000")) which will throw NumberFormatException for non-numeric values; change
getMaxResult to defensively parse the property inside a try-catch (catch
NumberFormatException) and return the default 1000 when parsing fails or the
property is malformed, and optionally log a warning via the class logger to
indicate the fallback; keep the existing branch that defers to
sparkConnectInterpreter.getMaxResult() when sparkConnectInterpreter != null.

Comment on lines +153 to +166
public void close() throws InterpreterException {
if (sparkSession != null) {
try {
sparkSession.close();
LOGGER.info("Spark Connect session closed for user: {}", currentUser);
} catch (Exception e) {
LOGGER.warn("Error closing Spark Connect session", e);
}
sparkSession = null;
}
if (sessionSlotAcquired) {
releaseSessionSlot(currentUser);
sessionSlotAcquired = false;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

close() should be synchronized to prevent double session-slot release.

Line 163 reads and mutates sessionSlotAcquired without synchronization; concurrent close() calls can decrement userSessionCount more than once for one interpreter instance.

Proposed fix
-  public void close() throws InterpreterException {
+  public synchronized void close() throws InterpreterException {
     if (sparkSession != null) {
       try {
         sparkSession.close();
         LOGGER.info("Spark Connect session closed for user: {}", currentUser);
       } catch (Exception e) {
         LOGGER.warn("Error closing Spark Connect session", e);
       }
       sparkSession = null;
     }
     if (sessionSlotAcquired) {
       releaseSessionSlot(currentUser);
       sessionSlotAcquired = false;
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void close() throws InterpreterException {
if (sparkSession != null) {
try {
sparkSession.close();
LOGGER.info("Spark Connect session closed for user: {}", currentUser);
} catch (Exception e) {
LOGGER.warn("Error closing Spark Connect session", e);
}
sparkSession = null;
}
if (sessionSlotAcquired) {
releaseSessionSlot(currentUser);
sessionSlotAcquired = false;
}
public synchronized void close() throws InterpreterException {
if (sparkSession != null) {
try {
sparkSession.close();
LOGGER.info("Spark Connect session closed for user: {}", currentUser);
} catch (Exception e) {
LOGGER.warn("Error closing Spark Connect session", e);
}
sparkSession = null;
}
if (sessionSlotAcquired) {
releaseSessionSlot(currentUser);
sessionSlotAcquired = false;
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
around lines 153 - 166, The close() method in SparkConnectInterpreter must be
made thread-safe to avoid double release of session slots: synchronize access to
the sessionSlotAcquired flag and the call to releaseSessionSlot so that only one
thread can observe-and-clear the flag and perform release; e.g., add a
synchronized method or a synchronized(this) block around the section that checks
sessionSlotAcquired, calls releaseSessionSlot(currentUser) and sets
sessionSlotAcquired = false, while leaving sparkSession.close() handling as-is
(or also inside the same synchronization if preferred) to ensure release happens
exactly once.

Comment on lines +193 to +194
int limit = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
String.valueOf(maxResult)));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle invalid limit values defensively.

Line 193 can throw NumberFormatException, which currently escapes the SQL-error handling path and fails the paragraph with a less controlled error.

Proposed fix
-      int limit = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
-          String.valueOf(maxResult)));
+      int limit = maxResult;
+      String limitProp = context.getLocalProperties().get("limit");
+      if (StringUtils.isNotBlank(limitProp)) {
+        try {
+          limit = Integer.parseInt(limitProp);
+        } catch (NumberFormatException nfe) {
+          return new InterpreterResult(Code.ERROR,
+              "Invalid local property 'limit': " + limitProp);
+        }
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
int limit = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
String.valueOf(maxResult)));
int limit = maxResult;
String limitProp = context.getLocalProperties().get("limit");
if (StringUtils.isNotBlank(limitProp)) {
try {
limit = Integer.parseInt(limitProp);
} catch (NumberFormatException nfe) {
return new InterpreterResult(Code.ERROR,
"Invalid local property 'limit': " + limitProp);
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
around lines 193 - 194, The parsing of the "limit" property in
SparkConnectInterpreter (the line using Integer.parseInt on
context.getLocalProperties().getOrDefault) can throw NumberFormatException and
bypass the SQL-error handling; modify this by validating/parsing the value
defensively: wrap the parse in a try/catch (catch NumberFormatException), reject
non-positive or malformed values (e.g., NaN, negative, zero) by falling back to
maxResult or clamping within a safe range, and ensure any parse error is handled
and logged consistently so it flows through the same SQL-error path as other
runtime errors; update references to context.getLocalProperties(), the 'limit'
variable and 'maxResult' accordingly.

Comment on lines +269 to +271
LOGGER.error("Error executing SQL: {}", sql, e);
context.out.write("\nError in SQL: " + sql + "\n");
if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace", "true"))) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging raw SQL text on server error logs.

Line 269 logs full SQL statements; this can leak sensitive literals/PII into centralized logs.

Proposed fix
-      LOGGER.error("Error executing SQL: {}", sql, e);
+      LOGGER.error("Error executing SQL statement", e);

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
spark-connect/src/main/resources/interpreter-setting.json (1)

111-116: ⚠️ Potential issue | 🟠 Major

Treat spark.connect.token as a secret field.

This is still declared as "type": "string", so the token will be visible in the interpreter UI/export instead of being masked like other credentials.

🔐 Proposed change
       "spark.connect.token": {
         "envName": "SPARK_CONNECT_TOKEN",
         "propertyName": "spark.connect.token",
         "defaultValue": "",
         "description": "Authentication token for Spark Connect (optional)",
-        "type": "string"
+        "type": "password"
       },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/resources/interpreter-setting.json` around lines 111 -
116, The interpreter-setting entry for spark.connect.token currently uses
"type": "string" so the token is exposed; update the spark.connect.token setting
(propertyName "spark.connect.token", envName "SPARK_CONNECT_TOKEN") to be
treated as a secret by changing its type to a masked/secret type (e.g., "type":
"password" or the project's secret flag) so the interpreter UI and exports will
mask the value while keeping the envName and defaultValue intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`:
- Around line 171-182: The override in PySparkConnectInterpreter (method that
returns the Python executable) removes the base PythonInterpreter's Conda
fallback and narrows setupCondaLibraryPath() to only match paths containing
"/conda/", causing conda environments managed by Zeppelin or installed elsewhere
to be ignored; restore the base-class fallback behavior by delegating to or
reusing PythonInterpreter's conda-detection logic instead of unconditionally
returning "python" (e.g., invoke the superclass method or call the same conda
resolution used in PythonInterpreter), and update setupCondaLibraryPath() to
detect conda envs more robustly (check for conda metadata like "conda-meta"
directory or use CONDA_PREFIX/CONDA_EXE environment variables) so conda
environments configured through Zeppelin or located under different directories
are recognized.
- Around line 110-123: The code sets PYSPARK_PYTHON using getPythonExec(), which
uses the driver-only resolution and ignores spark.pyspark.python; add a new
helper (e.g., getWorkerPythonExec or resolveWorkerPythonExec) that resolves the
worker Python by first checking the spark.pyspark.python setting, then the
PYSPARK_PYTHON environment variable, and only then falling back to the driver
executable from getPythonExec(); update setupPythonEnv() to call this new method
to set PYSPARK_PYTHON (and pass that worker path into setupCondaLibraryPath) and
add a clear log entry (LOGGER.info) showing the chosen worker python.

---

Duplicate comments:
In `@spark-connect/src/main/resources/interpreter-setting.json`:
- Around line 111-116: The interpreter-setting entry for spark.connect.token
currently uses "type": "string" so the token is exposed; update the
spark.connect.token setting (propertyName "spark.connect.token", envName
"SPARK_CONNECT_TOKEN") to be treated as a secret by changing its type to a
masked/secret type (e.g., "type": "password" or the project's secret flag) so
the interpreter UI and exports will mask the value while keeping the envName and
defaultValue intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1cac382f-883c-4855-a4e3-8576f8ab8234

📥 Commits

Reviewing files that changed from the base of the PR and between 0e79887 and 90efa08.

📒 Files selected for processing (2)
  • spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java
  • spark-connect/src/main/resources/interpreter-setting.json

Comment on lines +110 to +123
protected Map<String, String> setupPythonEnv() throws IOException {
Map<String, String> env = super.setupPythonEnv();

// Set PYSPARK_PYTHON environment variable (following Spark's pattern)
// This ensures Python subprocesses can find the correct Python executable
String pythonExec = getPythonExec();
env.put("PYSPARK_PYTHON", pythonExec);
LOGGER.info("Set PYSPARK_PYTHON: {}", pythonExec);

// Set up LD_LIBRARY_PATH for conda installations
// This is critical - conda Python binaries depend on libraries in conda/lib
setupCondaLibraryPath(env, pythonExec);

LOGGER.info("LD_LIBRARY_PATH: {}", env.get("LD_LIBRARY_PATH"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "PySparkConnectInterpreter.java" -o -name "interpreter-setting.json" | head -20

Repository: Meesho/zeppelin

Length of output: 1149


🏁 Script executed:

# Find the PySparkConnectInterpreter.java file
fd "PySparkConnectInterpreter.java"

Repository: Meesho/zeppelin

Length of output: 229


🏁 Script executed:

# Check directory structure for spark-connect
git ls-files spark-connect/ | grep -E "(\.java|\.json)" | head -20

Repository: Meesho/zeppelin

Length of output: 959


🏁 Script executed:

# Read the PySparkConnectInterpreter.java file
cat -n spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java | head -250

Repository: Meesho/zeppelin

Length of output: 11070


🏁 Script executed:

# Check the interpreter-setting.json for spark-connect
cat spark-connect/src/main/resources/interpreter-setting.json

Repository: Meesho/zeppelin

Length of output: 6576


🏁 Script executed:

# Also check the parent PythonInterpreter class to understand getPythonExec()
fd "PythonInterpreter.java" | xargs -I {} sh -c 'echo "=== {} ===" && wc -l {}'

Repository: Meesho/zeppelin

Length of output: 368


🏁 Script executed:

# Read the parent PythonInterpreter class to understand the context
cat -n python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | grep -A 50 "protected Map<String, String> setupPythonEnv"

Repository: Meesho/zeppelin

Length of output: 2102


🏁 Script executed:

# Also check the parent getPythonExec() method to see if it's overridden
cat -n python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java | grep -B 5 -A 30 "protected String getPythonExec"

Repository: Meesho/zeppelin

Length of output: 1406


🏁 Script executed:

# Verify the exact behavior by checking if getPythonExec is called elsewhere
rg "getPythonExec\(\)" spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java -n

Repository: Meesho/zeppelin

Length of output: 263


🏁 Script executed:

# Check how parent setupPythonEnv is called
rg "setupPythonEnv" python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java -B 2 -A 10

Repository: Meesho/zeppelin

Length of output: 1035


Separate worker and driver Python resolution for PYSPARK_PYTHON.

setupPythonEnv() uses getPythonExec() to set PYSPARK_PYTHON, which prioritizes spark.pyspark.driver.python. This causes spark.pyspark.python—intended for worker processes—to be ignored when both settings exist. The configuration UI exposes these as independent options, but the implementation doesn't honor this separation.

Suggested fix: Extract worker Python resolution into a separate method that checks only spark.pyspark.python and PYSPARK_PYTHON environment variable before falling back to the driver executable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 110 - 123, The code sets PYSPARK_PYTHON using getPythonExec(),
which uses the driver-only resolution and ignores spark.pyspark.python; add a
new helper (e.g., getWorkerPythonExec or resolveWorkerPythonExec) that resolves
the worker Python by first checking the spark.pyspark.python setting, then the
PYSPARK_PYTHON environment variable, and only then falling back to the driver
executable from getPythonExec(); update setupPythonEnv() to call this new method
to set PYSPARK_PYTHON (and pass that worker path into setupCondaLibraryPath) and
add a clear log entry (LOGGER.info) showing the chosen worker python.

Comment on lines +171 to +182
// Priority 5: zeppelin.python (Zeppelin property) - only if explicitly set
String zeppelinPython = getProperty("zeppelin.python", "");
if (StringUtils.isNotBlank(zeppelinPython)) {
LOGGER.info("Using Python executable from zeppelin.python property: {}", zeppelinPython);
return zeppelinPython;
}

// Priority 6: Default to "python" (let system PATH handle it, just like Spark)
// Spark's PySparkInterpreter defaults to "python" - we do the same
// This relies on system PATH to find Python, no explicit path needed
LOGGER.info("No Python executable configured, defaulting to 'python' (will use system PATH)");
return "python";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep the base interpreter's Conda fallback intact.

Compared with python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java, this override drops the inherited Conda fallback and setupCondaLibraryPath() only recognizes executables whose path literally contains /conda/. Conda envs configured through Zeppelin or installed under a different directory name will miss this path entirely.

♻️ Proposed change
-    String zeppelinPython = getProperty("zeppelin.python", "");
-    if (StringUtils.isNotBlank(zeppelinPython)) {
-      LOGGER.info("Using Python executable from zeppelin.python property: {}", zeppelinPython);
-      return zeppelinPython;
-    }
-    
-    // Priority 6: Default to "python" (let system PATH handle it, just like Spark)
-    // Spark's PySparkInterpreter defaults to "python" - we do the same
-    // This relies on system PATH to find Python, no explicit path needed
-    LOGGER.info("No Python executable configured, defaulting to 'python' (will use system PATH)");
-    return "python";
+    String pythonExec = super.getPythonExec();
+    LOGGER.info("Using Python executable from Zeppelin fallback chain: {}", pythonExec);
+    return pythonExec;
   }
   
   private void setupCondaLibraryPath(Map<String, String> env, String pythonExec) {
-    // If python path contains "/conda/", add conda lib to LD_LIBRARY_PATH
-    // This only applies if an explicit conda path is configured
-    if (pythonExec != null && pythonExec.contains("/conda/")) {
-      // Extract conda base path (e.g., /opt/conda/default from /opt/conda/default/bin/python3)
-      int binIndex = pythonExec.indexOf("/bin/");
-      if (binIndex > 0) {
-        String condaBase = pythonExec.substring(0, binIndex);
-        String condaLib = condaBase + "/lib";
-        java.io.File libDir = new java.io.File(condaLib);
-        if (libDir.exists() && libDir.isDirectory()) {
+    if (StringUtils.isBlank(pythonExec)) {
+      return;
+    }
+    int binIndex = pythonExec.lastIndexOf("/bin/");
+    if (binIndex > 0) {
+      String envBase = pythonExec.substring(0, binIndex);
+      java.io.File condaMetaDir = new java.io.File(envBase, "conda-meta");
+      java.io.File libDir = new java.io.File(envBase, "lib");
+      if (condaMetaDir.isDirectory() && libDir.isDirectory()) {
+        String condaLib = libDir.getAbsolutePath();
           String ldLibraryPath = env.getOrDefault("LD_LIBRARY_PATH", "");
           if (ldLibraryPath.isEmpty()) {
             env.put("LD_LIBRARY_PATH", condaLib);
           } else if (!ldLibraryPath.contains(condaLib)) {
             env.put("LD_LIBRARY_PATH", condaLib + ":" + ldLibraryPath);
           }
           LOGGER.info("Added conda lib directory to LD_LIBRARY_PATH: {}", condaLib);
-        }
       }
     }
-    // If using "python" from PATH, don't modify LD_LIBRARY_PATH
-    // Let the system handle it - Python should already be configured correctly
   }

Also applies to: 185-205

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/PySparkConnectInterpreter.java`
around lines 171 - 182, The override in PySparkConnectInterpreter (method that
returns the Python executable) removes the base PythonInterpreter's Conda
fallback and narrows setupCondaLibraryPath() to only match paths containing
"/conda/", causing conda environments managed by Zeppelin or installed elsewhere
to be ignored; restore the base-class fallback behavior by delegating to or
reusing PythonInterpreter's conda-detection logic instead of unconditionally
returning "python" (e.g., invoke the superclass method or call the same conda
resolution used in PythonInterpreter), and update setupCondaLibraryPath() to
detect conda envs more robustly (check for conda metadata like "conda-meta"
directory or use CONDA_PREFIX/CONDA_EXE environment variables) so conda
environments configured through Zeppelin or located under different directories
are recognized.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java (1)

185-190: Consider handling carriage return (\r) as well.

The method replaces \t and \n but not \r, which could also disrupt table formatting in some environments.

♻️ Proposed fix
   static String replaceReservedChars(String str) {
     if (str == null) {
       return "null";
     }
-    return str.replace('\t', ' ').replace('\n', ' ');
+    return str.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ');
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java`
around lines 185 - 190, The method replaceReservedChars currently only replaces
tabs and newlines which misses carriage returns; update replaceReservedChars to
also sanitize '\r' (either by chaining another replace for '\r' or using a
single regex replaceAll/replace to cover '\t', '\n', and '\r') so all three
reserved whitespace characters are converted to spaces and table formatting
won't break.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`:
- Line 138: The direct Integer.parseInt call for "zeppelin.spark.maxResult" in
SparkConnectInterpreter can throw NumberFormatException for non-numeric input;
wrap the parse in a defensive block (try/catch NumberFormatException) when
reading getProperty("zeppelin.spark.maxResult", "1000") in the
SparkConnectInterpreter initialization, fall back to the default value 1000 on
error, and log a clear warning including the bad value and that the default is
used (use the class logger or existing logging mechanism used for the max
sessions handling).
- Around line 90-92: The config parsing in SparkConnectInterpreter uses
Integer.parseInt on getProperty("zeppelin.spark.connect.maxSessionsPerUser",
...) which will throw NumberFormatException for non-numeric values; wrap the
parse in a try/catch, validate the string first or catch NumberFormatException
and then either fallback to DEFAULT_MAX_SESSIONS_PER_USER or rethrow a clear
IllegalArgumentException with the property name and invalid value (include the
offending value in the message), and ensure the variable maxSessions is set to
the safe fallback when invalid; update the code near where maxSessions is
assigned in class SparkConnectInterpreter to implement this defensive parsing
and clear error messaging.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java`:
- Around line 64-77: The code in SparkConnectUtils that appends raw token and
userName into the connection params can produce malformed URIs or allow
parameter injection; update the logic to URL-encode token and userName before
appending (use java.net.URLEncoder with UTF-8) so
params.append(";token=").append(URLEncoder.encode(token, "UTF-8")) and
params.append(";user_id=").append(URLEncoder.encode(userName, "UTF-8")). Keep
the existing guards (StringUtils.isNotBlank(token)/userName and
!remote.contains("user_id=")) but apply encoding to the values you append.

---

Nitpick comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java`:
- Around line 185-190: The method replaceReservedChars currently only replaces
tabs and newlines which misses carriage returns; update replaceReservedChars to
also sanitize '\r' (either by chaining another replace for '\r' or using a
single regex replaceAll/replace to cover '\t', '\n', and '\r') so all three
reserved whitespace characters are converted to spaces and table formatting
won't break.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dfdb6df6-9187-41d6-b2b2-1604a6bbf9a4

📥 Commits

Reviewing files that changed from the base of the PR and between 90efa08 and 7d26fd3.

📒 Files selected for processing (3)
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java
  • spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectUtilsTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark-connect/src/test/java/org/apache/zeppelin/spark/SparkConnectUtilsTest.java

Comment on lines +90 to +92
int maxSessions = Integer.parseInt(
getProperty("zeppelin.spark.connect.maxSessionsPerUser",
String.valueOf(DEFAULT_MAX_SESSIONS_PER_USER)));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle invalid configuration values defensively.

If zeppelin.spark.connect.maxSessionsPerUser contains a non-numeric value, Integer.parseInt throws NumberFormatException which propagates as a generic connection failure. Consider validating with a clearer error message.

🛡️ Proposed fix
-      int maxSessions = Integer.parseInt(
-          getProperty("zeppelin.spark.connect.maxSessionsPerUser",
-              String.valueOf(DEFAULT_MAX_SESSIONS_PER_USER)));
+      int maxSessions;
+      String maxSessionsProp = getProperty("zeppelin.spark.connect.maxSessionsPerUser",
+          String.valueOf(DEFAULT_MAX_SESSIONS_PER_USER));
+      try {
+        maxSessions = Integer.parseInt(maxSessionsProp);
+      } catch (NumberFormatException e) {
+        throw new InterpreterException(
+            "Invalid value for zeppelin.spark.connect.maxSessionsPerUser: " + maxSessionsProp);
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
int maxSessions = Integer.parseInt(
getProperty("zeppelin.spark.connect.maxSessionsPerUser",
String.valueOf(DEFAULT_MAX_SESSIONS_PER_USER)));
int maxSessions;
String maxSessionsProp = getProperty("zeppelin.spark.connect.maxSessionsPerUser",
String.valueOf(DEFAULT_MAX_SESSIONS_PER_USER));
try {
maxSessions = Integer.parseInt(maxSessionsProp);
} catch (NumberFormatException e) {
throw new InterpreterException(
"Invalid value for zeppelin.spark.connect.maxSessionsPerUser: " + maxSessionsProp);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
around lines 90 - 92, The config parsing in SparkConnectInterpreter uses
Integer.parseInt on getProperty("zeppelin.spark.connect.maxSessionsPerUser",
...) which will throw NumberFormatException for non-numeric values; wrap the
parse in a try/catch, validate the string first or catch NumberFormatException
and then either fallback to DEFAULT_MAX_SESSIONS_PER_USER or rethrow a clear
IllegalArgumentException with the property name and invalid value (include the
offending value in the message), and ensure the variable maxSessions is set to
the safe fallback when invalid; update the code near where maxSessions is
assigned in class SparkConnectInterpreter to implement this defensive parsing
and clear error messaging.

sparkSession = builder.getOrCreate();
LOGGER.info("Spark Connect session established for user: {}", currentUser);

maxResult = Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle invalid zeppelin.spark.maxResult values defensively.

Similar to the max sessions property, a non-numeric zeppelin.spark.maxResult will throw NumberFormatException with an unclear error message.

🛡️ Proposed fix
-      maxResult = Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
+      String maxResultProp = getProperty("zeppelin.spark.maxResult", "1000");
+      try {
+        maxResult = Integer.parseInt(maxResultProp);
+      } catch (NumberFormatException e) {
+        throw new InterpreterException(
+            "Invalid value for zeppelin.spark.maxResult: " + maxResultProp);
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
maxResult = Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
String maxResultProp = getProperty("zeppelin.spark.maxResult", "1000");
try {
maxResult = Integer.parseInt(maxResultProp);
} catch (NumberFormatException e) {
throw new InterpreterException(
"Invalid value for zeppelin.spark.maxResult: " + maxResultProp);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
at line 138, The direct Integer.parseInt call for "zeppelin.spark.maxResult" in
SparkConnectInterpreter can throw NumberFormatException for non-numeric input;
wrap the parse in a defensive block (try/catch NumberFormatException) when
reading getProperty("zeppelin.spark.maxResult", "1000") in the
SparkConnectInterpreter initialization, fall back to the default value 1000 on
error, and log a clear warning including the bad value and that the default is
used (use the class logger or existing logging mechanism used for the max
sessions handling).

Comment on lines +64 to +77
String token = properties.getProperty("spark.connect.token", "");
if (StringUtils.isNotBlank(token)) {
params.append(";token=").append(token);
}

boolean useSsl = Boolean.parseBoolean(
properties.getProperty("spark.connect.use_ssl", "false"));
if (useSsl) {
params.append(";use_ssl=true");
}

if (StringUtils.isNotBlank(userName) && !remote.contains("user_id=")) {
params.append(";user_id=").append(userName);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

URL-encode token and userName to prevent URI parsing issues.

If token or userName contains special characters (;, =, /, spaces), the connection string can be malformed or allow parameter injection. Consider encoding these values.

🛡️ Proposed fix to add URL encoding
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
 ...
 
     String token = properties.getProperty("spark.connect.token", "");
     if (StringUtils.isNotBlank(token)) {
-      params.append(";token=").append(token);
+      params.append(";token=").append(URLEncoder.encode(token, StandardCharsets.UTF_8));
     }
 
     boolean useSsl = Boolean.parseBoolean(
         properties.getProperty("spark.connect.use_ssl", "false"));
     if (useSsl) {
       params.append(";use_ssl=true");
     }
 
     if (StringUtils.isNotBlank(userName) && !remote.contains("user_id=")) {
-      params.append(";user_id=").append(userName);
+      params.append(";user_id=").append(URLEncoder.encode(userName, StandardCharsets.UTF_8));
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
String token = properties.getProperty("spark.connect.token", "");
if (StringUtils.isNotBlank(token)) {
params.append(";token=").append(token);
}
boolean useSsl = Boolean.parseBoolean(
properties.getProperty("spark.connect.use_ssl", "false"));
if (useSsl) {
params.append(";use_ssl=true");
}
if (StringUtils.isNotBlank(userName) && !remote.contains("user_id=")) {
params.append(";user_id=").append(userName);
}
String token = properties.getProperty("spark.connect.token", "");
if (StringUtils.isNotBlank(token)) {
params.append(";token=").append(URLEncoder.encode(token, StandardCharsets.UTF_8));
}
boolean useSsl = Boolean.parseBoolean(
properties.getProperty("spark.connect.use_ssl", "false"));
if (useSsl) {
params.append(";use_ssl=true");
}
if (StringUtils.isNotBlank(userName) && !remote.contains("user_id=")) {
params.append(";user_id=").append(URLEncoder.encode(userName, StandardCharsets.UTF_8));
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectUtils.java`
around lines 64 - 77, The code in SparkConnectUtils that appends raw token and
userName into the connection params can produce malformed URIs or allow
parameter injection; update the logic to URL-encode token and userName before
appending (use java.net.URLEncoder with UTF-8) so
params.append(";token=").append(URLEncoder.encode(token, "UTF-8")) and
params.append(";user_id=").append(URLEncoder.encode(userName, "UTF-8")). Keep
the existing guards (StringUtils.isNotBlank(token)/userName and
!remote.contains("user_id=")) but apply encoding to the values you append.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`:
- Around line 54-65: After opening sparkConnectInterpreter, ensure you roll
back/close it if any following Python setup or super.open() fails: wrap the
calls that use pySparkConnectInterpreter (setProperty("zeppelin.python", ...),
setUseBuiltinPy4j(true), setAdditionalPythonInitFile(...)) and super.open() in a
try/catch (or try/finally) and on any exception call
sparkConnectInterpreter.close() (and set opened=false if you set it before)
before rethrowing the exception; reference the existing symbols
sparkConnectInterpreter, pySparkConnectInterpreter, setProperty,
setUseBuiltinPy4j, setAdditionalPythonInitFile, super.open, and opened to locate
where to add the rollback.
- Around line 105-113: The close() method currently resets IPySpark state but
does not close the underlying SparkConnectInterpreter, causing stale sessions;
update IPySparkConnectInterpreter.close() to check if sparkConnectInterpreter is
non-null and call its close() (handling/propagating InterpreterException as
appropriate) before nullifying it, ensuring you call super.close() and then
close sparkConnectInterpreter (or close it inside the try/finally) so the
underlying SparkConnectInterpreter is properly closed and its session slot
released.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`:
- Around line 148-159: If open() fails after assigning sparkSession via
builder.getOrCreate(), the catch block must close and null out the
partially-open session to avoid a half-open interpreter; update the exception
handling in SparkConnectInterpreter.open() so that in the generic Exception
catch you detect non-null sparkSession and call its shutdown/close/stop method
(sparkSession.stop() or sparkSession.close()), set sparkSession = null, and then
proceed to releaseSessionSlot(currentUser) and clear sessionSlotAcquired;
reference sparkSession, open(), getOrCreate(), releaseSessionSlot, and
sessionSlotAcquired when making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0189ec59-f5cf-4036-9394-6fb223345cec

📥 Commits

Reviewing files that changed from the base of the PR and between 7d26fd3 and 0342ad7.

📒 Files selected for processing (2)
  • spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java
  • spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java

Comment on lines +54 to +65
this.sparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
this.pySparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);

sparkConnectInterpreter.open();

setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
setUseBuiltinPy4j(true);
setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
super.open();
opened = true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Rollback the Spark Connect session if IPython startup fails.

After Line 59 succeeds, any exception from Python setup or super.open() leaves sparkConnectInterpreter running even though this wrapper never finished opening.

Suggested fix
   `@Override`
   public synchronized void open() throws InterpreterException {
     if (opened) {
       return;
     }
 
-    this.sparkConnectInterpreter =
-        getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
-    this.pySparkConnectInterpreter =
-        getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);
-
-    sparkConnectInterpreter.open();
-
-    setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
-    setUseBuiltinPy4j(true);
-    setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
-    super.open();
-    opened = true;
+    try {
+      this.sparkConnectInterpreter =
+          getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
+      this.pySparkConnectInterpreter =
+          getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);
+
+      sparkConnectInterpreter.open();
+
+      setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
+      setUseBuiltinPy4j(true);
+      setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
+      super.open();
+      opened = true;
+    } catch (InterpreterException | RuntimeException e) {
+      try {
+        if (sparkConnectInterpreter != null) {
+          sparkConnectInterpreter.close();
+        }
+      } catch (InterpreterException closeError) {
+        LOGGER.warn("Error rolling back Spark Connect interpreter after open failure",
+            closeError);
+      }
+      sparkConnectInterpreter = null;
+      pySparkConnectInterpreter = null;
+      throw e;
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 54 - 65, After opening sparkConnectInterpreter, ensure you roll
back/close it if any following Python setup or super.open() fails: wrap the
calls that use pySparkConnectInterpreter (setProperty("zeppelin.python", ...),
setUseBuiltinPy4j(true), setAdditionalPythonInitFile(...)) and super.open() in a
try/catch (or try/finally) and on any exception call
sparkConnectInterpreter.close() (and set opened=false if you set it before)
before rethrowing the exception; reference the existing symbols
sparkConnectInterpreter, pySparkConnectInterpreter, setProperty,
setUseBuiltinPy4j, setAdditionalPythonInitFile, super.open, and opened to locate
where to add the rollback.

Comment on lines +105 to +113
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
opened = false;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the underlying SparkConnectInterpreter here too.

open() explicitly opens sparkConnectInterpreter, but close() only tears down the IPython side. Reopening this interpreter can therefore reuse a stale Spark Connect session and keep the per-user session slot occupied.

Suggested fix
   `@Override`
   public void close() throws InterpreterException {
     LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
     try {
       super.close();
     } finally {
+      if (sparkConnectInterpreter != null) {
+        try {
+          sparkConnectInterpreter.close();
+        } catch (InterpreterException e) {
+          LOGGER.warn("Error closing SparkConnectInterpreter", e);
+        }
+      }
       opened = false;
+      curIntpContext = null;
       sparkConnectInterpreter = null;
       pySparkConnectInterpreter = null;
       LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
opened = false;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
if (sparkConnectInterpreter != null) {
try {
sparkConnectInterpreter.close();
} catch (InterpreterException e) {
LOGGER.warn("Error closing SparkConnectInterpreter", e);
}
}
opened = false;
curIntpContext = null;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 105 - 113, The close() method currently resets IPySpark state but
does not close the underlying SparkConnectInterpreter, causing stale sessions;
update IPySparkConnectInterpreter.close() to check if sparkConnectInterpreter is
non-null and call its close() (handling/propagating InterpreterException as
appropriate) before nullifying it, ensuring you call super.close() and then
close sparkConnectInterpreter (or close it inside the try/finally) so the
underlying SparkConnectInterpreter is properly closed and its session slot
released.

Comment on lines +148 to +159
sparkSession = builder.getOrCreate();
LOGGER.info("Spark Connect session established for user: {}", currentUser);

maxResult = Integer.parseInt(getProperty("zeppelin.spark.maxResult", "1000"));
sqlSplitter = new SqlSplitter();
} catch (InterpreterException ie) {
throw ie;
} catch (Exception e) {
if (sessionSlotAcquired) {
releaseSessionSlot(currentUser);
sessionSlotAcquired = false;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the remote session when open() fails after getOrCreate().

sparkSession is assigned at Line 148 before zeppelin.spark.maxResult is parsed. If that later initialization throws, the catch block releases the slot but leaves the remote session open and sparkSession non-null, so the interpreter is stuck in a half-open state.

Suggested fix
-    } catch (Exception e) {
+    } catch (Exception e) {
+      if (sparkSession != null) {
+        try {
+          sparkSession.close();
+        } catch (Exception closeError) {
+          LOGGER.warn("Error closing partially initialized Spark Connect session", closeError);
+        } finally {
+          sparkSession = null;
+        }
+      }
       if (sessionSlotAcquired) {
         releaseSessionSlot(currentUser);
         sessionSlotAcquired = false;
       }
       LOGGER.error("Failed to connect to Spark Connect server", e);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/SparkConnectInterpreter.java`
around lines 148 - 159, If open() fails after assigning sparkSession via
builder.getOrCreate(), the catch block must close and null out the
partially-open session to avoid a half-open interpreter; update the exception
handling in SparkConnectInterpreter.open() so that in the generic Exception
catch you detect non-null sparkSession and call its shutdown/close/stop method
(sparkSession.stop() or sparkSession.close()), set sparkSession = null, and then
proceed to releaseSessionSlot(currentUser) and clear sessionSlotAcquired;
reference sparkSession, open(), getOrCreate(), releaseSessionSlot, and
sessionSlotAcquired when making the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant