Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,15 @@ public static void clear() throws IOException, HiveException, MetaException {
.setPayloadClass(HoodieAvroPayload.class)
.initTable(HadoopFSUtils.getStorageConfWithCopy(configuration), basePath);

for (String tableName : createdTablesSet) {
ddlExecutor.runSQL("drop table if exists " + tableName);
if (ddlExecutor != null) {
for (String tableName : createdTablesSet) {
ddlExecutor.runSQL("drop table if exists " + tableName);
}
}
createdTablesSet.clear();
ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade");
if (ddlExecutor != null) {
ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade");
}
}

public static HiveConf getHiveConf() {
Expand Down Expand Up @@ -226,6 +230,7 @@ public static void shutdown() {
try {
if (hiveServer != null) {
hiveServer.stop();
hiveServer = null;
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -235,6 +240,7 @@ public static void shutdown() {
try {
if (hiveTestService != null) {
hiveTestService.stop();
hiveTestService = null;
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -244,6 +250,7 @@ public static void shutdown() {
try {
if (zkServer != null) {
zkServer.shutdown(true);
zkServer = null;
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -253,6 +260,7 @@ public static void shutdown() {
try {
if (zkService != null) {
zkService.stop();
zkService = null;
}
} catch (RuntimeException re) {
re.printStackTrace();
Expand All @@ -262,6 +270,7 @@ public static void shutdown() {
try {
if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
} catch (IOException ie) {
ie.printStackTrace();
Expand Down
Comment thread
nsivabalan marked this conversation as resolved.
Comment thread
nsivabalan marked this conversation as resolved.
Comment thread
nsivabalan marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncTool;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;

Comment thread
nsivabalan marked this conversation as resolved.
/**
* Utility job for running Hive sync on-demand for Hudi tables.
* <p>
* This tool allows you to synchronize Hudi table metadata with Hive metastore
* independently from ingestion workflows, useful for backfills, manual data
* corrections, or quick metadata reconciliation.
* <p>
* Example usage:
* <pre>
* spark-submit \
* --class org.apache.hudi.utilities.HudiHiveSyncJob \
* hudi-utilities.jar \
* --base-path /path/to/hudi/table \
* --base-file-format PARQUET \
* --props-file-path /path/to/hive-sync.properties \
* --hoodie-conf hoodie.datasource.hive_sync.database=my_db \
* --hoodie-conf hoodie.datasource.hive_sync.table=my_table
* </pre>
*/
public class HudiHiveSyncJob {

private static final Logger LOG = LoggerFactory.getLogger(HudiHiveSyncJob.class);

private final Config cfg;
private final Configuration hadoopConf;
private final TypedProperties props;

public HudiHiveSyncJob(JavaSparkContext jsc, Config cfg) {
this.cfg = cfg;
this.hadoopConf = jsc.hadoopConfiguration();
this.props = UtilHelpers.buildProperties(hadoopConf, cfg.propsFilePath, cfg.configs);
}

public static void main(String[] args) throws IOException {
final Config cfg = new Config();
new JCommander(cfg, null, args);
LOG.info("Cfg received: {}", cfg);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sensitive values are being logged.

Line [79] and Line [101] can expose credentials because Config.toString() and props include user/password entries.

🔐 Proposed fix (redact or avoid secret-bearing logs)
-    LOG.info("Cfg received: {}", cfg);
+    LOG.info("Cfg received: basePath={}, baseFileFormat={}, propsFilePath={}, configsCount={}",
+        cfg.basePath, cfg.baseFileFormat, cfg.propsFilePath, cfg.configs.size());

@@
-      LOG.info("HiveSyncConfig props used to sync data {}", props);
+      LOG.info("HiveSyncConfig prepared for sync ({} entries)", props.size());

Also applies to: 101-101, 136-142

CodeRabbit (original) (source:comment#3075836235)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging full config/properties with credentials.

Both log lines can expose secrets passed via --hoodie-conf / properties (for example Hive password). Please log only non-sensitive fields and counts.

🔐 Suggested redaction-focused change
-    LOG.info("Cfg received: {}", cfg);
+    LOG.info("Cfg received: basePath={}, baseFileFormat={}, propsFilePath={}, hoodieConfCount={}",
+        cfg.basePath, cfg.baseFileFormat, cfg.propsFilePath, cfg.configs.size());
...
-      LOG.info("HiveSyncConfig props used to sync data {}", props);
+      LOG.info("HiveSyncConfig prepared: basePath={}, baseFileFormat={}, propCount={}",
+          cfg.basePath, cfg.baseFileFormat, props.size());

Also applies to: 101-101

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
at line 79, The current LOG.info calls in HudiHiveSyncJob that log the full cfg
object (e.g., LOG.info("Cfg received: {}", cfg); and the similar log at the
later occurrence) can expose secrets; change these to redact sensitive
properties by filtering the configuration before logging: build a sanitized
map/summary from cfg that removes or masks keys containing sensitive substrings
("password", "secret", "token", "key", "credentials", "auth") and only include
non-sensitive entries and counts (e.g., number of properties, database/table
names), then log that sanitized summary instead of cfg; implement the sanitizer
as a small helper used by the HudiHiveSyncJob logging sites to ensure consistent
redaction.

CodeRabbit (original) (source:comment#3111728505)

JavaSparkContext jsc;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The condition here is inverted. isNullOrEmpty is true when cfg.sparkMaster is blank (the default ""), so the if branch runs when no master was provided — and it passes the empty string to buildSparkContext. The else branch runs when the user does supply --spark-master yarn (or similar) and hardcodes local[2] instead of using their value. The branches need to be swapped:

if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) {
  jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true);
} else {
  jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true);
}

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) {
jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true);
} else {
jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify actual Spark context builder semantics and conventions in sibling utilities jobs.

set -euo pipefail

echo "== Locate UtilHelpers =="
fd -i "UtilHelpers.java"

echo
echo "== Inspect buildSparkContext declarations/implementations =="
rg -n -C3 --type=java '\bbuildSparkContext\s*\('

echo
echo "== Compare spark-master fallback patterns in utilities jobs =="
rg -n -C3 --type=java 'sparkMaster|local\[2\]|buildSparkContext\("Hudi'

Repository: yihua/hudi

Length of output: 50367


🏁 Script executed:

cat -n hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java | head -150

Repository: yihua/hudi

Length of output: 6583


Fix --spark-master fallback to honor environment configuration.

Lines 81–85 hard-code local[2] when cfg.sparkMaster is empty, but the parameter description at lines 125–127 states it should inherit from the environment when not defined. Update to call UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.enableHiveSupport) when cfg.sparkMaster is empty, matching the behavior of other utilities like HoodieStreamer and respecting documented configuration precedence.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
around lines 81 - 85, The Spark master fallback currently hard-codes "local[2]"
when cfg.sparkMaster is empty; change the logic in HudiHiveSyncJob so that when
StringUtils.isNullOrEmpty(cfg.sparkMaster) it calls
UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.enableHiveSupport) instead
of using "local[2]" so the job inherits the environment/default Spark master
like HoodieStreamer; keep the existing branch that uses cfg.sparkMaster when
present and ensure you reference UtilHelpers.buildSparkContext and
cfg.enableHiveSupport in the updated call.

CodeRabbit (original) (source:comment#3075836239)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Locate UtilHelpers implementation(s)
fd -i UtilHelpers.java

# 2) Inspect buildSparkContext overloads and master resolution logic
rg -n --type=java -C6 '\bbuildSparkContext\s*\(' $(fd -i UtilHelpers.java)
rg -n --type=java -C8 'spark\.master|setMaster|getMaster|defaultMaster|master' $(fd -i UtilHelpers.java)

# 3) Confirm current usage in HudiHiveSyncJob
rg -n --type=java -C4 'sparkMaster|local\[2\]|buildSparkContext' hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java

Repository: hudi-agent/hudi

Length of output: 11688


Parameter description needs clarification about spark.master precedence.

The current parameter description ("inherits from your environment") could mislead users about the fallback behavior. The code correctly uses sparkConf.get("spark.master", defaultMaster), which does respect the environment's spark.master setting first. However, if the environment has no spark.master defined, it defaults to "local[2]". Update the description to clarify the full precedence: "spark master to use; if not specified, inherits spark.master from environment (e.g., via spark-defaults.conf); if neither is provided, defaults to local[2]."

Same concern applies to lines 125–128.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java`
around lines 81 - 85, Update the parameter description for spark master used by
HudiHiveSyncJob to explicitly state precedence: when cfg.sparkMaster is provided
it is used; if not provided the code inherits spark.master from the environment
(e.g., spark-defaults.conf or SparkConf), and if that is also absent it falls
back to "local[2]". Change the description text wherever the current ambiguous
phrase appears (referencing cfg.sparkMaster and the calls around
UtilHelpers.buildSparkContext / sparkConf.get usage) so both places reflect this
exact precedence and default.

CodeRabbit (original) (source:comment#3111728516)

try {
new HudiHiveSyncJob(jsc, cfg).run();
} finally {
jsc.stop();
}
}

public void run() throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Misleading throws IOException declaration

run() declares throws IOException, but it never actually throws a checked IOException. All exceptions from HiveSyncTool are caught in the try/catch block and re-thrown as unchecked HoodieException. The syncTool.close() in the finally block throws HoodieHiveSyncException (also unchecked). The throws IOException is dead and misleads callers into handling an exception that will never be checked-thrown.

Suggested change
public void run() throws IOException {
public void run() {

Greptile (original) (source:comment#3075833850)

LOG.info("Starting hive sync for {}", cfg.basePath);
HoodieTimer timer = HoodieTimer.start();
HiveSyncTool syncTool = null;
try {
props.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Mutable shared props modified in run()

props is a field of HudiHiveSyncJob that is built once in the constructor. Calling props.put(...) inside run() mutates the shared field on every invocation. These two properties are known at construction time and should be set in the constructor, keeping run() free of field mutations:

// In the constructor, after building props:
this.props.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
this.props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);

Greptile (original) (source:comment#3075834235)


LOG.info("HiveSyncConfig props used to sync data {}", props);
syncTool = new HiveSyncTool(props, new HiveConf(hadoopConf, HiveConf.class));
syncTool.syncHoodieTable();
} catch (Exception e) {
LOG.error("Exception in running hive-sync", e);
throw new HoodieException("Hive sync failed", e);
} finally {
if (syncTool != null) {
syncTool.close();
}
LOG.info("Hive-sync duration in ms {}", timer.endTimer());
Comment thread
nsivabalan marked this conversation as resolved.
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Exception suppression in finally block

If syncTool.syncHoodieTable() throws (caught and re-thrown as HoodieException), and then syncTool.close() also throws a HoodieHiveSyncException inside the finally block, Java will suppress the original HoodieException — the primary sync failure and its cause are silently lost, replaced by the close-failure. This makes debugging very confusing.

Since HiveSyncTool implements AutoCloseable, the cleanest fix is try-with-resources, which automatically adds suppressed exceptions:

public void run() {
    LOG.info("Starting hive sync for {}", cfg.basePath);
    HoodieTimer timer = HoodieTimer.start();
    props.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
    props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);
    LOG.info("HiveSyncConfig props used to sync data {}", props);
    try (HiveSyncTool syncTool = new HiveSyncTool(props, new HiveConf(hadoopConf, HiveConf.class))) {
      syncTool.syncHoodieTable();
    } catch (Exception e) {
      LOG.error("Exception in running hive-sync", e);
      throw new HoodieException("Hive sync failed", e);
    } finally {
      LOG.info("Hive-sync duration in ms {}", timer.endTimer());
    }
}

Greptile (original) (source:comment#3075833789)


public static class Config implements Serializable {
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
public String basePath = null;

@Parameter(names = {"--base-file-format", "-bff"}, description = "Base file format of the dataset")
public String baseFileFormat = "PARQUET";

@Parameter(names = {"--props-file-path"}, description = "Path to properties file on localfs or dfs.")
public String propsFilePath = null;

@Parameter(names = {"--spark-master"},
description = "spark master to use, if not defined inherits from your environment taking into "
+ "account Spark Configuration priority rules (e.g. not using spark-submit command).")
public String sparkMaster = "";

@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();

@Override
public String toString() {
return "Config{"
+ "basePath='" + basePath + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", propsFilePath='" + propsFilePath + '\''
+ ", configs=" + configs
+ '}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.hive.testutils.HiveTestUtil;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.AfterAll;

import java.nio.file.Path;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Locale;
import java.util.UUID;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

/**
* Test cases for {@link HudiHiveSyncJob}.
*/
@Disabled("Hive set up in CI is failing")
public class TestHudiHiveSyncJob {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Feature test is fully disabled, so new behavior is unguarded in CI.

Please avoid shipping the new sync path with its only validation class-level disabled. Prefer running it in an integration/flaky profile (or env-gated assumption) rather than disabling the class entirely.

✅ Minimal direction to keep coverage without blocking default CI
-@Disabled("Hive set up in CI is failing")
+// Prefer tagging/routing this test to an integration profile instead of disabling it.
+@org.junit.jupiter.api.Tag("integration")
 public class TestHudiHiveSyncJob {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public class TestHudiHiveSyncJob {
// Prefer tagging/routing this test to an integration profile instead of disabling it.
`@org.junit.jupiter.api.Tag`("integration")
public class TestHudiHiveSyncJob {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHudiHiveSyncJob.java`
around lines 62 - 63, The TestHudiHiveSyncJob test class is fully disabled via
the `@Disabled` annotation which leaves the new sync path untested in CI; remove
the class-level `@Disabled` and instead guard the test with a runtime assumption
or test profile: update TestHudiHiveSyncJob to remove or comment out the
`@Disabled` on the class and add an environment/profile-based gate (e.g.,
assumeTrue(System.getenv("RUN_HIVE_INTEGRATION") != null) or an
`@Tag/`@EnabledIfEnvironmentVariable equivalent) inside the class setup or
`@BeforeAll` so the tests only run when the CI job enables the Hive integration;
keep the class name TestHudiHiveSyncJob and any existing test methods unchanged
while ensuring the gating logic skips tests by default in standard CI.

CodeRabbit (original) (source:comment#3111728523)


@TempDir
Path tempDir;

@BeforeEach
void setUp() throws Exception {
HiveTestUtil.setUp(Option.empty(), true);
}

@AfterEach
void cleanUp() {
try {
HiveTestUtil.clear();
} catch (Throwable t) {
// no-op for cleanup failures in tests
}
}

@AfterAll
static void cleanUpClass() {
HiveTestUtil.shutdown();
}

@Test
void testRunRegistersUnregisteredHudiDatasetInMetastore() throws Exception {
String tableName = "hive_sync_job_" + UUID.randomUUID().toString().replace("-", "");
String basePath = Files.createDirectory(tempDir.resolve("hudi-table")).toUri().toString();
String databaseName = "default";
HudiHiveSyncJob.Config cfg = new HudiHiveSyncJob.Config();
cfg.basePath = basePath;
cfg.baseFileFormat = "PARQUET";
cfg.configs.add("hoodie.datasource.hive_sync.database=" + databaseName);
cfg.configs.add("hoodie.datasource.hive_sync.table=" + tableName);
cfg.configs.add("hoodie.datasource.meta.sync.database=" + databaseName);
cfg.configs.add("hoodie.datasource.meta.sync.table=" + tableName);
cfg.configs.add(HIVE_SYNC_MODE.key() + "=jdbc");
cfg.configs.add(HIVE_URL.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_URL.key()));
cfg.configs.add(HIVE_USER.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_USER.key()));
cfg.configs.add(HIVE_PASS.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_PASS.key()));

JavaSparkContext jsc = null;
SparkSession spark = null;
try {
jsc = UtilHelpers.buildSparkContext("test-hudi-hive-sync-job", "local[2]", false);
spark = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate();

StructType schema = new StructType()
.add("id", DataTypes.StringType, false)
.add("name", DataTypes.StringType, true)
.add("ts", DataTypes.LongType, false);
Dataset<Row> source = spark.createDataFrame(
Arrays.asList(
RowFactory.create("1", "a1", 1000L),
RowFactory.create("2", "a2", 1001L)),
schema);

// Write Hudi dataset by path only: this should create commits but not register a metastore table.
source.write().format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.precombine.field", "ts")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option("hoodie.datasource.write.partitionpath.field", "")
.mode("overwrite")
.save(basePath);

assertFalse(tableExists(databaseName, tableName, basePath, spark));

new HudiHiveSyncJob(jsc, cfg).run();

assertTrue(tableExists(databaseName, tableName, basePath, spark));
} finally {
if (spark != null) {
spark.close();
}
if (jsc != null) {
jsc.stop();
}
}
}

private boolean tableExists(String dbName, String tableName, String basePath, SparkSession spark) {
String catalogImpl = spark.conf().get("spark.sql.catalogImplementation", "in-memory")
.toLowerCase(Locale.ROOT);
if ("hive".equals(catalogImpl)) {
return spark.catalog().tableExists(dbName, tableName);
}
return tableExistsInMetastore(dbName, tableName, basePath);
}

private boolean tableExistsInMetastore(String dbName, String tableName, String basePath) {
TypedProperties props = TypedProperties.copy(HiveTestUtil.hiveSyncProps);
props.setProperty(META_SYNC_DATABASE_NAME.key(), dbName);
props.setProperty(META_SYNC_TABLE_NAME.key(), tableName);
props.setProperty(META_SYNC_BASE_PATH.key(), basePath);
try (HoodieHiveSyncClient client = new HoodieHiveSyncClient(
new HiveSyncConfig(props, HiveTestUtil.getHiveConf()),
mock(HoodieTableMetaClient.class))) {
return client.tableExists(tableName);
}
}
}
Loading