feat(utilities): add external HudiHiveSyncJob for on-demand Hive sync#18204
Conversation
89c7363 to
9953414
Compare
9953414 to
e26d2bc
Compare
|
@suryaprasanna : looks like there are valid test failures in CI |
e26d2bc to
1a66b68
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
[Update review] PR #18204 adds a new HudiHiveSyncJob utility for running Hive metastore sync on-demand, independent of ingestion workflows. The implementation is straightforward but has three issues that were flagged in the previous review and remain unaddressed: a missing dash prefix on a CLI parameter alias (functional bug), a hardcoded local[2] Spark master that breaks cluster deployment, and a SparkContext leak in the main() method.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! The -bff alias dash prefix is now fixed, and the try/finally resource-guard for the SparkContext is a good addition. Unfortunately the new --spark-master branching logic has the condition inverted, so the fix for the hardcoded local[2] issue actually makes things worse — see the inline comment for the details.
| final Config cfg = new Config(); | ||
| new JCommander(cfg, null, args); | ||
| LOG.info("Cfg received: {}", cfg); | ||
| JavaSparkContext jsc; |
There was a problem hiding this comment.
🤖 The condition here is inverted. isNullOrEmpty is true when cfg.sparkMaster is blank (the default ""), so the if branch runs when no master was provided — and it passes the empty string to buildSparkContext. The else branch runs when the user does supply --spark-master yarn (or similar) and hardcodes local[2] instead of using their value. The branches need to be swapped:
if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) {
jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true);
} else {
jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true);
}- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! The inverted spark-master condition is now fixed — the branches correctly fall back to local[2] when no master is provided and use cfg.sparkMaster otherwise. The test lifecycle improvements (per-test setup/teardown with @BeforeEach/@AfterEach) and the defensive null guards in HiveTestUtil are sensible changes for better test isolation and cleanup safety. LGTM.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: The PR introduces a new Hive sync utility job for Hudi with CLI argument parsing and table synchronization capabilities, alongside comprehensive test coverage. Additionally, resource cleanup in test utilities is improved with conditional null-guarding and explicit null-assignment to prevent resource leaks.
Greptile Summary: This PR introduces HudiHiveSyncJob, a new standalone Spark utility for running Hive metastore sync on-demand against any Hudi table path, decoupled from ingestion pipelines. It also ships a full integration test backed by an embedded Hive server, and defensively hardens HiveTestUtil with null-checks on shared static fields to prevent cascading failures across test classes.
Key changes:
HudiHiveSyncJob.java— newspark-submit-compatible entry point; wrapsHiveSyncToolwith JCommander config parsing, timer logging, and afinallyclose block.TestHudiHiveSyncJob.java— end-to-end test that writes a Hudi dataset without Hive registration, then verifiesHudiHiveSyncJob.run()correctly registers it in the embedded metastore.HiveTestUtil.java— addsnullguards forddlExecutorinclear(), and assignsnullafter stoppinghiveServer,hiveTestService,zkServer,zkService, andfileSysteminshutdown()to prevent stale-reference bugs across tests.
Issues found:
- Exception suppression (P1): In
run(), ifsyncHoodieTable()throws and thensyncTool.close()also throws insidefinally, Java silently discards the primary exception. Using try-with-resources would preserve both via suppressed exception chaining. - Dead
throws IOException(P2):run()declaresthrows IOExceptionbut all exceptions are caught and re-thrown as uncheckedHoodieException; the declaration misleads callers. - Mutable
propsinrun()(P2):META_SYNC_BASE_PATHandMETA_SYNC_BASE_FILE_FORMATare placed into the sharedpropsfield on everyrun()call rather than in the constructor. getHiveConf()NPE risk (P2): Aftershutdown()setshiveServer = null, a call toHiveTestUtil.getHiveConf()would throw aNullPointerExceptionwith no diagnostic message.
Greptile Confidence Score: 3/5
Safe to merge after addressing the exception-suppression bug in run(); remaining issues are style/cleanup
The P1 exception-suppression issue in the finally block is a real correctness bug: when both syncHoodieTable() and close() fail, the primary sync failure is silently discarded and only the close exception surfaces, making production incidents very hard to diagnose. This warrants a fix before merge. The remaining P2s (dead throws IOException, props mutation in run(), getHiveConf() NPE guard) are style and defensive-coding concerns that do not affect the happy path.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java — exception handling in run()
Sequence Diagram (CodeRabbit):
sequenceDiagram
actor User
participant HudiHiveSyncJob
participant HiveSyncTool
participant HiveMetastore
participant SparkContext
User->>HudiHiveSyncJob: main(args)
HudiHiveSyncJob->>HudiHiveSyncJob: parse CLI args via JCommander
HudiHiveSyncJob->>SparkContext: create Spark context
SparkContext-->>HudiHiveSyncJob: context ready
HudiHiveSyncJob->>HudiHiveSyncJob: construct properties from file + configs
HudiHiveSyncJob->>HiveSyncTool: new HiveSyncTool(properties)
HudiHiveSyncJob->>HiveSyncTool: run()
HiveSyncTool->>HiveMetastore: sync table metadata
HiveMetastore-->>HiveSyncTool: sync complete
HudiHiveSyncJob->>HiveSyncTool: close()
HudiHiveSyncJob->>SparkContext: stop()
HudiHiveSyncJob-->>User: job finished
Sequence Diagram (Greptile):
sequenceDiagram
participant CLI as spark-submit / caller
participant Job as HudiHiveSyncJob
participant UH as UtilHelpers
participant JSC as JavaSparkContext
participant HST as HiveSyncTool
participant HMS as Hive Metastore
CLI->>Job: main(args)
Job->>UH: buildSparkContext(...)
UH-->>Job: JavaSparkContext
Job->>Job: new HudiHiveSyncJob(jsc, cfg)
note over Job: builds props from propsFilePath + --hoodie-conf overrides
Job->>Job: run()
Job->>Job: props.put(BASE_PATH, BASE_FILE_FORMAT)
Job->>HST: new HiveSyncTool(props, HiveConf)
Job->>HST: syncHoodieTable()
HST->>HMS: register / update table & partitions
HMS-->>HST: OK
HST-->>Job: (done)
Job->>HST: close() [finally]
Job->>JSC: stop() [finally in main]
CodeRabbit: yihua#35 (review)
Greptile: yihua#35 (review)
| } | ||
| LOG.info("Hive-sync duration in ms {}", timer.endTimer()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Exception suppression in
finally block
If syncTool.syncHoodieTable() throws (caught and re-thrown as HoodieException), and then syncTool.close() also throws a HoodieHiveSyncException inside the finally block, Java will suppress the original HoodieException — the primary sync failure and its cause are silently lost, replaced by the close-failure. This makes debugging very confusing.
Since HiveSyncTool implements AutoCloseable, the cleanest fix is try-with-resources, which automatically adds suppressed exceptions:
public void run() {
LOG.info("Starting hive sync for {}", cfg.basePath);
HoodieTimer timer = HoodieTimer.start();
props.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);
LOG.info("HiveSyncConfig props used to sync data {}", props);
try (HiveSyncTool syncTool = new HiveSyncTool(props, new HiveConf(hadoopConf, HiveConf.class))) {
syncTool.syncHoodieTable();
} catch (Exception e) {
LOG.error("Exception in running hive-sync", e);
throw new HoodieException("Hive sync failed", e);
} finally {
LOG.info("Hive-sync duration in ms {}", timer.endTimer());
}
}— Greptile (original) (source:comment#3075833789)
| } | ||
| } | ||
|
|
||
| public void run() throws IOException { |
There was a problem hiding this comment.
Misleading
throws IOException declaration
run() declares throws IOException, but it never actually throws a checked IOException. All exceptions from HiveSyncTool are caught in the try/catch block and re-thrown as unchecked HoodieException. The syncTool.close() in the finally block throws HoodieHiveSyncException (also unchecked). The throws IOException is dead and misleads callers into handling an exception that will never be checked-thrown.
| public void run() throws IOException { | |
| public void run() { |
— Greptile (original) (source:comment#3075833850)
| HiveSyncTool syncTool = null; | ||
| try { | ||
| props.put(META_SYNC_BASE_PATH.key(), cfg.basePath); | ||
| props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat); |
There was a problem hiding this comment.
Mutable shared
props modified in run()
props is a field of HudiHiveSyncJob that is built once in the constructor. Calling props.put(...) inside run() mutates the shared field on every invocation. These two properties are known at construction time and should be set in the constructor, keeping run() free of field mutations:
// In the constructor, after building props:
this.props.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
this.props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);— Greptile (original) (source:comment#3075834235)
| public static void main(String[] args) throws IOException { | ||
| final Config cfg = new Config(); | ||
| new JCommander(cfg, null, args); | ||
| LOG.info("Cfg received: {}", cfg); |
There was a problem hiding this comment.
Sensitive values are being logged.
Line [79] and Line [101] can expose credentials because Config.toString() and props include user/password entries.
🔐 Proposed fix (redact or avoid secret-bearing logs)
- LOG.info("Cfg received: {}", cfg);
+ LOG.info("Cfg received: basePath={}, baseFileFormat={}, propsFilePath={}, configsCount={}",
+ cfg.basePath, cfg.baseFileFormat, cfg.propsFilePath, cfg.configs.size());
@@
- LOG.info("HiveSyncConfig props used to sync data {}", props);
+ LOG.info("HiveSyncConfig prepared for sync ({} entries)", props.size());Also applies to: 101-101, 136-142
— CodeRabbit (original) (source:comment#3075836235)
| jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true); | ||
| } else { | ||
| jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify actual Spark context builder semantics and conventions in sibling utilities jobs.
set -euo pipefail
echo "== Locate UtilHelpers =="
fd -i "UtilHelpers.java"
echo
echo "== Inspect buildSparkContext declarations/implementations =="
rg -n -C3 --type=java '\bbuildSparkContext\s*\('
echo
echo "== Compare spark-master fallback patterns in utilities jobs =="
rg -n -C3 --type=java 'sparkMaster|local\[2\]|buildSparkContext\("Hudi'Repository: yihua/hudi
Length of output: 50367
🏁 Script executed:
cat -n hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java | head -150Repository: yihua/hudi
Length of output: 6583
Fix --spark-master fallback to honor environment configuration.
Lines 81–85 hard-code local[2] when cfg.sparkMaster is empty, but the parameter description at lines 125–127 states it should inherit from the environment when not defined. Update to call UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.enableHiveSupport) when cfg.sparkMaster is empty, matching the behavior of other utilities like HoodieStreamer and respecting documented configuration precedence.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
around lines 81 - 85, The Spark master fallback currently hard-codes "local[2]"
when cfg.sparkMaster is empty; change the logic in HudiHiveSyncJob so that when
StringUtils.isNullOrEmpty(cfg.sparkMaster) it calls
UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.enableHiveSupport) instead
of using "local[2]" so the job inherits the environment/default Spark master
like HoodieStreamer; keep the existing branch that uses cfg.sparkMaster when
present and ensure you reference UtilHelpers.buildSparkContext and
cfg.enableHiveSupport in the updated call.
— CodeRabbit (original) (source:comment#3075836239)
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the updates! Since the last pass, the author replied to the prior reviewer comments on HudiHiveSyncJob.java (remove datacenter/unused variable, add Javadoc with example commands, close syncTool in a finally block) and indicated those have been addressed — no diff for that file is included in this incremental slice to re-verify, but the author's responses are tracked in the thread. The only new diff here adds @Disabled("Hive set up in CI is failing") to TestHudiHiveSyncJob, which is a pragmatic workaround but does mean the new job now has no enforced test coverage in CI; worth a follow-up to re-enable once the Hive CI setup is fixed. No new correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: Three files are modified to introduce on-demand Hive table synchronization. A new HudiHiveSyncJob utility class is added with a CLI entry point that parses arguments, initializes Spark, and invokes Hive sync. A corresponding test class validates the functionality. Test infrastructure improvements add null-safety guards and resource cleanup in HiveTestUtil.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant CLI as CLI/Main
participant HHSJ as HudiHiveSyncJob
participant JSC as JavaSparkContext
participant HST as HiveSyncTool
participant HM as Hive Metastore
CLI->>HHSJ: main(args) with Config
HHSJ->>JSC: create (local[2] or custom master)
HHSJ->>HHSJ: build TypedProperties + HiveConf
HHSJ->>HST: construct HiveSyncTool
HHSJ->>HST: syncHoodieTable()
HST->>HM: sync Hudi table metadata
HM-->>HST: confirmation
HST-->>HHSJ: sync complete
HHSJ->>HST: close (finally)
HHSJ->>JSC: stop (finally)
HHSJ-->>CLI: return / log duration
CodeRabbit: hudi-agent#7 (review)
| public static void main(String[] args) throws IOException { | ||
| final Config cfg = new Config(); | ||
| new JCommander(cfg, null, args); | ||
| LOG.info("Cfg received: {}", cfg); |
There was a problem hiding this comment.
Avoid logging full config/properties with credentials.
Both log lines can expose secrets passed via --hoodie-conf / properties (for example Hive password). Please log only non-sensitive fields and counts.
🔐 Suggested redaction-focused change
- LOG.info("Cfg received: {}", cfg);
+ LOG.info("Cfg received: basePath={}, baseFileFormat={}, propsFilePath={}, hoodieConfCount={}",
+ cfg.basePath, cfg.baseFileFormat, cfg.propsFilePath, cfg.configs.size());
...
- LOG.info("HiveSyncConfig props used to sync data {}", props);
+ LOG.info("HiveSyncConfig prepared: basePath={}, baseFileFormat={}, propCount={}",
+ cfg.basePath, cfg.baseFileFormat, props.size());Also applies to: 101-101
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
at line 79, The current LOG.info calls in HudiHiveSyncJob that log the full cfg
object (e.g., LOG.info("Cfg received: {}", cfg); and the similar log at the
later occurrence) can expose secrets; change these to redact sensitive
properties by filtering the configuration before logging: build a sanitized
map/summary from cfg that removes or masks keys containing sensitive substrings
("password", "secret", "token", "key", "credentials", "auth") and only include
non-sensitive entries and counts (e.g., number of properties, database/table
names), then log that sanitized summary instead of cfg; implement the sanitizer
as a small helper used by the HudiHiveSyncJob logging sites to ensure consistent
redaction.
— CodeRabbit (original) (source:comment#3111728505)
| jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true); | ||
| } else { | ||
| jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Locate UtilHelpers implementation(s)
fd -i UtilHelpers.java
# 2) Inspect buildSparkContext overloads and master resolution logic
rg -n --type=java -C6 '\bbuildSparkContext\s*\(' $(fd -i UtilHelpers.java)
rg -n --type=java -C8 'spark\.master|setMaster|getMaster|defaultMaster|master' $(fd -i UtilHelpers.java)
# 3) Confirm current usage in HudiHiveSyncJob
rg -n --type=java -C4 'sparkMaster|local\[2\]|buildSparkContext' hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.javaRepository: hudi-agent/hudi
Length of output: 11688
Parameter description needs clarification about spark.master precedence.
The current parameter description ("inherits from your environment") could mislead users about the fallback behavior. The code correctly uses sparkConf.get("spark.master", defaultMaster), which does respect the environment's spark.master setting first. However, if the environment has no spark.master defined, it defaults to "local[2]". Update the description to clarify the full precedence: "spark master to use; if not specified, inherits spark.master from environment (e.g., via spark-defaults.conf); if neither is provided, defaults to local[2]."
Same concern applies to lines 125–128.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
around lines 81 - 85, Update the parameter description for spark master used by
HudiHiveSyncJob to explicitly state precedence: when cfg.sparkMaster is provided
it is used; if not provided the code inherits spark.master from the environment
(e.g., spark-defaults.conf or SparkConf), and if that is also absent it falls
back to "local[2]". Change the description text wherever the current ambiguous
phrase appears (referencing cfg.sparkMaster and the calls around
UtilHelpers.buildSparkContext / sparkConf.get usage) so both places reflect this
exact precedence and default.
— CodeRabbit (original) (source:comment#3111728516)
| * Test cases for {@link HudiHiveSyncJob}. | ||
| */ | ||
| @Disabled("Hive set up in CI is failing") | ||
| public class TestHudiHiveSyncJob { |
There was a problem hiding this comment.
Feature test is fully disabled, so new behavior is unguarded in CI.
Please avoid shipping the new sync path with its only validation class-level disabled. Prefer running it in an integration/flaky profile (or env-gated assumption) rather than disabling the class entirely.
✅ Minimal direction to keep coverage without blocking default CI
-@Disabled("Hive set up in CI is failing")
+// Prefer tagging/routing this test to an integration profile instead of disabling it.
+@org.junit.jupiter.api.Tag("integration")
public class TestHudiHiveSyncJob {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public class TestHudiHiveSyncJob { | |
| // Prefer tagging/routing this test to an integration profile instead of disabling it. | |
| `@org.junit.jupiter.api.Tag`("integration") | |
| public class TestHudiHiveSyncJob { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHudiHiveSyncJob.java`
around lines 62 - 63, The TestHudiHiveSyncJob test class is fully disabled via
the `@Disabled` annotation which leaves the new sync path untested in CI; remove
the class-level `@Disabled` and instead guard the test with a runtime assumption
or test profile: update TestHudiHiveSyncJob to remove or comment out the
`@Disabled` on the class and add an environment/profile-based gate (e.g.,
assumeTrue(System.getenv("RUN_HIVE_INTEGRATION") != null) or an
`@Tag/`@EnabledIfEnvironmentVariable equivalent) inside the class setup or
`@BeforeAll` so the tests only run when the CI job enables the Hive integration;
keep the class name TestHudiHiveSyncJob and any existing test methods unchanged
while ensuring the gating logic skips tests by default in standard CI.
— CodeRabbit (original) (source:comment#3111728523)
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18204 +/- ##
============================================
+ Coverage 68.46% 68.83% +0.37%
- Complexity 27472 28336 +864
============================================
Files 2427 2467 +40
Lines 132655 135867 +3212
Branches 15994 16482 +488
============================================
+ Hits 90819 93527 +2708
- Misses 34786 34941 +155
- Partials 7050 7399 +349
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Today, running Hive sync is typically coupled to ingestion/streaming execution paths. In operational scenarios, teams often need a lightweight way to trigger Hive sync externally and independently (for example, after backfills, manual data corrections, or quick metadata reconciliation).
This PR adds an external Hive sync utility job so users can execute Hive sync on demand without embedding the logic into another pipeline execution.
Summary and Changelog
HudiHiveSyncJobunderhudi-utilitiesas an external runner for Hive sync.HiveSyncTooldirectly.TestHudiHiveSyncJobvalidating:HudiHiveSyncJob,Impact
Risk Level
low
This is an additive utility with integration coverage. Existing sync flows remain unchanged.
Documentation Update
none
Contributor's checklist