diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 1ab5bb5224229..4d75ca0e6b420 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -193,11 +193,15 @@ public static void clear() throws IOException, HiveException, MetaException { .setPayloadClass(HoodieAvroPayload.class) .initTable(HadoopFSUtils.getStorageConfWithCopy(configuration), basePath); - for (String tableName : createdTablesSet) { - ddlExecutor.runSQL("drop table if exists " + tableName); + if (ddlExecutor != null) { + for (String tableName : createdTablesSet) { + ddlExecutor.runSQL("drop table if exists " + tableName); + } } createdTablesSet.clear(); - ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade"); + if (ddlExecutor != null) { + ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade"); + } } public static HiveConf getHiveConf() { @@ -226,6 +230,7 @@ public static void shutdown() { try { if (hiveServer != null) { hiveServer.stop(); + hiveServer = null; } } catch (Exception e) { e.printStackTrace(); @@ -235,6 +240,7 @@ public static void shutdown() { try { if (hiveTestService != null) { hiveTestService.stop(); + hiveTestService = null; } } catch (Exception e) { e.printStackTrace(); @@ -244,6 +250,7 @@ public static void shutdown() { try { if (zkServer != null) { zkServer.shutdown(true); + zkServer = null; } } catch (Exception e) { e.printStackTrace(); @@ -253,6 +260,7 @@ public static void shutdown() { try { if (zkService != null) { zkService.stop(); + zkService = null; } } catch (RuntimeException re) { re.printStackTrace(); @@ -262,6 +270,7 @@ public static void shutdown() { try { if (fileSystem != null) { fileSystem.close(); + fileSystem = null; } } catch (IOException ie) { ie.printStackTrace(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java new file mode 100644 index 0000000000000..59d92658f4f81 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HudiHiveSyncJob.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncTool; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; + +/** + * Utility job for running Hive sync on-demand for Hudi tables. + *

+ * This tool allows you to synchronize Hudi table metadata with Hive metastore + * independently from ingestion workflows, useful for backfills, manual data + * corrections, or quick metadata reconciliation. + *

+ * Example usage: + *

+ * spark-submit \
+ *   --class org.apache.hudi.utilities.HudiHiveSyncJob \
+ *   hudi-utilities.jar \
+ *   --base-path /path/to/hudi/table \
+ *   --base-file-format PARQUET \
+ *   --props-file-path /path/to/hive-sync.properties \
+ *   --hoodie-conf hoodie.datasource.hive_sync.database=my_db \
+ *   --hoodie-conf hoodie.datasource.hive_sync.table=my_table
+ * 
+ */ +public class HudiHiveSyncJob { + + private static final Logger LOG = LoggerFactory.getLogger(HudiHiveSyncJob.class); + + private final Config cfg; + private final Configuration hadoopConf; + private final TypedProperties props; + + public HudiHiveSyncJob(JavaSparkContext jsc, Config cfg) { + this.cfg = cfg; + this.hadoopConf = jsc.hadoopConfiguration(); + this.props = UtilHelpers.buildProperties(hadoopConf, cfg.propsFilePath, cfg.configs); + } + + public static void main(String[] args) throws IOException { + final Config cfg = new Config(); + new JCommander(cfg, null, args); + LOG.info("Cfg received: {}", cfg); + JavaSparkContext jsc; + if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) { + jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", "local[2]", true); + } else { + jsc = UtilHelpers.buildSparkContext("HudiHiveSyncJob", cfg.sparkMaster, true); + } + try { + new HudiHiveSyncJob(jsc, cfg).run(); + } finally { + jsc.stop(); + } + } + + public void run() throws IOException { + LOG.info("Starting hive sync for {}", cfg.basePath); + HoodieTimer timer = HoodieTimer.start(); + HiveSyncTool syncTool = null; + try { + props.put(META_SYNC_BASE_PATH.key(), cfg.basePath); + props.put(META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat); + + LOG.info("HiveSyncConfig props used to sync data {}", props); + syncTool = new HiveSyncTool(props, new HiveConf(hadoopConf, HiveConf.class)); + syncTool.syncHoodieTable(); + } catch (Exception e) { + LOG.error("Exception in running hive-sync", e); + throw new HoodieException("Hive sync failed", e); + } finally { + if (syncTool != null) { + syncTool.close(); + } + LOG.info("Hive-sync duration in ms {}", timer.endTimer()); + } + } + + public static class Config implements Serializable { + @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true) + public String basePath = null; + + @Parameter(names = {"--base-file-format", "-bff"}, description = "Base file format of the dataset") + public String baseFileFormat = "PARQUET"; + + @Parameter(names = {"--props-file-path"}, description = "Path to properties file on localfs or dfs.") + public String propsFilePath = null; + + @Parameter(names = {"--spark-master"}, + description = "spark master to use, if not defined inherits from your environment taking into " + + "account Spark Configuration priority rules (e.g. not using spark-submit command).") + public String sparkMaster = ""; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) + public List configs = new ArrayList<>(); + + @Override + public String toString() { + return "Config{" + + "basePath='" + basePath + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", propsFilePath='" + propsFilePath + '\'' + + ", configs=" + configs + + '}'; + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHudiHiveSyncJob.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHudiHiveSyncJob.java new file mode 100644 index 0000000000000..64abaf68a8db8 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHudiHiveSyncJob.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncClient; +import org.apache.hudi.hive.testutils.HiveTestUtil; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.AfterAll; + +import java.nio.file.Path; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Locale; +import java.util.UUID; + +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Test cases for {@link HudiHiveSyncJob}. + */ +@Disabled("Hive set up in CI is failing") +public class TestHudiHiveSyncJob { + + @TempDir + Path tempDir; + + @BeforeEach + void setUp() throws Exception { + HiveTestUtil.setUp(Option.empty(), true); + } + + @AfterEach + void cleanUp() { + try { + HiveTestUtil.clear(); + } catch (Throwable t) { + // no-op for cleanup failures in tests + } + } + + @AfterAll + static void cleanUpClass() { + HiveTestUtil.shutdown(); + } + + @Test + void testRunRegistersUnregisteredHudiDatasetInMetastore() throws Exception { + String tableName = "hive_sync_job_" + UUID.randomUUID().toString().replace("-", ""); + String basePath = Files.createDirectory(tempDir.resolve("hudi-table")).toUri().toString(); + String databaseName = "default"; + HudiHiveSyncJob.Config cfg = new HudiHiveSyncJob.Config(); + cfg.basePath = basePath; + cfg.baseFileFormat = "PARQUET"; + cfg.configs.add("hoodie.datasource.hive_sync.database=" + databaseName); + cfg.configs.add("hoodie.datasource.hive_sync.table=" + tableName); + cfg.configs.add("hoodie.datasource.meta.sync.database=" + databaseName); + cfg.configs.add("hoodie.datasource.meta.sync.table=" + tableName); + cfg.configs.add(HIVE_SYNC_MODE.key() + "=jdbc"); + cfg.configs.add(HIVE_URL.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_URL.key())); + cfg.configs.add(HIVE_USER.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_USER.key())); + cfg.configs.add(HIVE_PASS.key() + "=" + HiveTestUtil.hiveSyncProps.getString(HIVE_PASS.key())); + + JavaSparkContext jsc = null; + SparkSession spark = null; + try { + jsc = UtilHelpers.buildSparkContext("test-hudi-hive-sync-job", "local[2]", false); + spark = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate(); + + StructType schema = new StructType() + .add("id", DataTypes.StringType, false) + .add("name", DataTypes.StringType, true) + .add("ts", DataTypes.LongType, false); + Dataset source = spark.createDataFrame( + Arrays.asList( + RowFactory.create("1", "a1", 1000L), + RowFactory.create("2", "a2", 1001L)), + schema); + + // Write Hudi dataset by path only: this should create commits but not register a metastore table. + source.write().format("hudi") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts") + .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .option("hoodie.datasource.write.partitionpath.field", "") + .mode("overwrite") + .save(basePath); + + assertFalse(tableExists(databaseName, tableName, basePath, spark)); + + new HudiHiveSyncJob(jsc, cfg).run(); + + assertTrue(tableExists(databaseName, tableName, basePath, spark)); + } finally { + if (spark != null) { + spark.close(); + } + if (jsc != null) { + jsc.stop(); + } + } + } + + private boolean tableExists(String dbName, String tableName, String basePath, SparkSession spark) { + String catalogImpl = spark.conf().get("spark.sql.catalogImplementation", "in-memory") + .toLowerCase(Locale.ROOT); + if ("hive".equals(catalogImpl)) { + return spark.catalog().tableExists(dbName, tableName); + } + return tableExistsInMetastore(dbName, tableName, basePath); + } + + private boolean tableExistsInMetastore(String dbName, String tableName, String basePath) { + TypedProperties props = TypedProperties.copy(HiveTestUtil.hiveSyncProps); + props.setProperty(META_SYNC_DATABASE_NAME.key(), dbName); + props.setProperty(META_SYNC_TABLE_NAME.key(), tableName); + props.setProperty(META_SYNC_BASE_PATH.key(), basePath); + try (HoodieHiveSyncClient client = new HoodieHiveSyncClient( + new HiveSyncConfig(props, HiveTestUtil.getHiveConf()), + mock(HoodieTableMetaClient.class))) { + return client.tableExists(tableName); + } + } +}