stdOutErrPair = executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true);
- assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|");
- assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n"
- + "|default |stock_ticks_derived_mor_ro|false |\n"
- + "|default |stock_ticks_derived_mor_rt|false |\n"
- + "|default |stock_ticks_mor_ro |false |\n"
- + "|default |stock_ticks_mor_rt |false |\n"
- + "| |stock_ticks_cow_incr |true |");
- assertStdOutContains(stdOutErrPair, "|count(1)|\n+--------+\n|99 |", 2);
+ assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|", 2);
+ assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n"
+ + "|default |stock_ticks_cow_bs |false |\n"
+ + "|default |stock_ticks_derived_mor_bs_ro|false |\n"
+ + "|default |stock_ticks_derived_mor_bs_rt|false |\n"
+ + "|default |stock_ticks_derived_mor_ro |false |\n"
+ + "|default |stock_ticks_derived_mor_rt |false |\n"
+ + "|default |stock_ticks_mor_bs_ro |false |\n"
+ + "|default |stock_ticks_mor_bs_rt |false |"
+ + "|default |stock_ticks_mor_ro |false |\n"
+ + "|default |stock_ticks_mor_rt |false |");
+ assertStdOutContains(stdOutErrPair, "|count(1)|\n+--------+\n|99 |", 4);
}
private void scheduleAndRunCompaction() throws Exception {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index b51805f91b0e7..e80c66e9522a3 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,13 +18,18 @@
package org.apache.hudi;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -34,6 +39,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
@@ -41,6 +47,8 @@
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -55,6 +63,8 @@
*/
public class DataSourceUtils {
+ private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class);
+
/**
* Create a key generator class via reflection, passing in any configs needed.
*
@@ -212,4 +222,58 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
return hiveSyncConfig;
}
+
+ public static String getTablePath(FileSystem fs, Path[] paths) throws IOException {
+ LOG.info("Getting table path..");
+ for (Path path: paths) {
+ FileStatus fileStatus = fs.getFileStatus(path);
+ Option tablePath;
+
+ if (fileStatus.isFile()) {
+ tablePath = getTablePathFromFile(fs, fileStatus);
+ } else {
+ tablePath = getTablePathFromDir(fs, fileStatus);
+ }
+
+ if (tablePath.isPresent()) {
+ return tablePath.get().toString();
+ }
+ }
+
+ throw new TableNotFoundException("Cannot find Hudi table for the path provided");
+ }
+
+ private static Option getTablePathFromFile(FileSystem fs, FileStatus fileStatus) throws IOException {
+ LOG.info("Getting table path from file path : " + fileStatus.getPath());
+ Path filePath = fileStatus.getPath();
+ String filePathStr = filePath.toString();
+
+ if (filePathStr.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")) {
+ // Handle file inside metadata folder
+ Path tablePath = new Path(filePathStr);
+ while (!tablePath.toString().endsWith(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ tablePath = tablePath.getParent();
+ }
+ return Option.of(tablePath.getParent());
+ } else if (HoodiePartitionMetadata.hasPartitionMetadata(fs, filePath.getParent())) {
+ // Handle partition path
+ Path partitionPath = filePath.getParent();
+ HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
+ metadata.readFromFS();
+ return Option.of(HoodieHiveUtil.getNthParent(partitionPath, metadata.getPartitionDepth()));
+ }
+
+ return Option.empty();
+ }
+
+ private static Option getTablePathFromDir(FileSystem fs, FileStatus fileStatus) throws IOException {
+ System.out.println("Getting table path from directory path : " + fileStatus.getPath().toString());
+ Path tablePath = new Path(fileStatus.getPath().toString());
+
+ while (tablePath != null && !fs.exists(new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+ tablePath = tablePath.getParent();
+ }
+
+ return tablePath == null ? Option.empty() : Option.of(tablePath);
+ }
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index e810ff1779dcb..af4504573ffb4 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -38,6 +38,8 @@ public class ComplexKeyGenerator extends KeyGenerator {
protected final boolean hiveStylePartitioning;
+ protected final boolean encodePartitionPath;
+
public ComplexKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","))
@@ -47,6 +49,8 @@ public ComplexKeyGenerator(TypedProperties props) {
.stream().map(String::trim).collect(Collectors.toList());
this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
+ this.encodePartitionPath = props.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
+ Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
}
@Override
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index a9df3ee751db6..8568d4eaccecf 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -37,12 +37,16 @@ public class SimpleKeyGenerator extends KeyGenerator {
protected final boolean hiveStylePartitioning;
+ protected final boolean encodePartitionPath;
+
public SimpleKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
this.partitionPathField = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
+ this.encodePartitionPath = props.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
+ Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
}
@Override
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3d1172f0f6fa8..d7431473f9581 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -50,6 +50,8 @@ object DataSourceReadOptions {
val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
+ val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths"
+
@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
@@ -129,6 +131,7 @@ object DataSourceWriteOptions {
val INSERT_OPERATION_OPT_VAL = "insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val DELETE_OPERATION_OPT_VAL = "delete"
+ val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap"
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**
@@ -207,7 +210,8 @@ object DataSourceWriteOptions {
*/
val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"
val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"
-
+ val URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode"
+ val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false"
/**
* Key generator class, that implements will extract the key out of incoming record
*
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index fbdd4ea9cfb1b..e003232c838a3 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -17,10 +17,16 @@
package org.apache.hudi
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.streaming.Sink
@@ -54,29 +60,54 @@ class DefaultSource extends RelationProvider
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)
val path = parameters.get("path")
- if (path.isEmpty) {
- throw new HoodieException("'path' must be specified.")
- }
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
- // this is just effectively RO view only, where `path` can contain a mix of
- // non-hoodie/hoodie path files. set the path filter up
- sqlContext.sparkContext.hadoopConfiguration.setClass(
- "mapreduce.input.pathFilter.class",
- classOf[HoodieROTablePathFilter],
- classOf[org.apache.hadoop.fs.PathFilter])
-
- log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
- log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
- "Please query the Hive table registered using Spark SQL.")
- // simply return as a regular parquet relation
- DataSource.apply(
- sparkSession = sqlContext.sparkSession,
- userSpecifiedSchema = Option(schema),
- className = "parquet",
- options = parameters)
- .resolveRelation()
+ val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+ if (path.isEmpty && readPathsStr.isEmpty) {
+ throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
+ }
+
+ val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
+ val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
+
+ val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
+ val globPaths = checkAndGlobPathIfNecessary(allPaths, fs)
+
+ val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+ log.info("Obtained hudi table path: " + tablePath)
+
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ val bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient)
+ log.info("Bootstrap Index Available: " + bootstrapIndex.isIndexAvailable)
+
+ if (bootstrapIndex.isIndexAvailable) {
+ // For bootstrapped tables, use our custom Spark relation for querying
+ new HudiBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
+ } else {
+ // this is just effectively RO view only, where `path` can contain a mix of
+ // non-hoodie/hoodie path files. set the path filter up
+ sqlContext.sparkContext.hadoopConfiguration.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[HoodieROTablePathFilter],
+ classOf[org.apache.hadoop.fs.PathFilter])
+
+ log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
+ log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
+ "Please query the Hive table registered using Spark SQL.")
+ // simply return as a regular parquet relation
+ DataSource.apply(
+ sparkSession = sqlContext.sparkSession,
+ paths = readPaths,
+ userSpecifiedSchema = Option(schema),
+ className = "parquet",
+ options = parameters)
+ .resolveRelation()
+ }
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
+ if (path.isEmpty) {
+ throw new HoodieException("'path' must be specified for incremental query.")
+ }
+
new IncrementalRelation(sqlContext, path.get, optParams, schema)
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
@@ -105,7 +136,12 @@ class DefaultSource extends RelationProvider
df: DataFrame): BaseRelation = {
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
- HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
+
+ if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
+ HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
+ } else {
+ HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
+ }
new HudiEmptyRelation(sqlContext, df.schema)
}
@@ -122,5 +158,13 @@ class DefaultSource extends RelationProvider
outputMode)
}
+ private def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
+ paths.flatMap(path => {
+ val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
+ val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+ globPaths
+ })
+ }
+
override def shortName(): String = "hudi"
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e1bfe877559c5..b7e7bf063afee 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
@@ -34,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.log4j.LogManager
+import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -60,7 +62,6 @@ private[hudi] object HoodieSparkSqlWriter {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
- val tableType = parameters(TABLE_TYPE_OPT_KEY)
val operation =
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
@@ -112,25 +113,7 @@ private[hudi] object HoodieSparkSqlWriter {
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie table at $basePath already exists.")
- }
- if (mode == SaveMode.Ignore && exists) {
- log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
- (true, common.util.Option.empty())
- }
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
-
- // Create the table if not present
- if (!exists) {
- HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType,
- tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY))
- }
+ initTable(mode, basePath, fs, exists, sparkContext, parameters)
// Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
@@ -190,6 +173,37 @@ private[hudi] object HoodieSparkSqlWriter {
(writeSuccessful, common.util.Option.ofNullable(instantTime))
}
+ def bootstrap(sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ df: DataFrame): Unit = {
+
+ val sparkContext = sqlContext.sparkContext
+ val path = parameters.get("path")
+ val tableName = parameters.get(HoodieWriteConfig.TABLE_NAME)
+
+ var schema: String = null
+ if (df.schema.nonEmpty) {
+ val structName = s"${tableName.get}_record"
+ val nameSpace = s"hoodie.${tableName.get}"
+ schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace).toString
+ } else {
+ schema = HoodieAvroUtils.getNullSchema.toString
+ }
+
+ val basePath = new Path(parameters("path"))
+ val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+ val exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+
+ initTable(mode, basePath, fs, exists, sparkContext, parameters)
+
+ val jsc = new JavaSparkContext(sqlContext.sparkContext)
+ val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path.get, tableName.get,
+ mapAsJavaMap(parameters))
+ writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
+ syncHiveIfEnabled(basePath, jsc, parameters)
+ }
+
/**
* Add default options for unspecified write options keys.
*
@@ -228,6 +242,42 @@ private[hudi] object HoodieSparkSqlWriter {
props
}
+ private def initTable(mode: SaveMode, basePath: Path, fs: FileSystem, tableExists: Boolean,
+ sparkContext: SparkContext, parameters: Map[String, String]): Unit = {
+ val tableName = parameters.get(HoodieWriteConfig.TABLE_NAME)
+ val tableType = parameters(TABLE_TYPE_OPT_KEY)
+
+ // Handle various save modes
+ if (mode == SaveMode.ErrorIfExists && tableExists) {
+ throw new HoodieException(s"hoodie table at $basePath already exists.")
+ }
+ if (mode == SaveMode.Ignore && tableExists) {
+ log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+ (true, common.util.Option.empty())
+ }
+ if (mode == SaveMode.Overwrite && tableExists) {
+ log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+ fs.delete(basePath, true)
+ }
+
+ // Create the table if not present
+ if (!tableExists) {
+ HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, basePath.toString, tableType,
+ tableName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY))
+ }
+ }
+
+ private def syncHiveIfEnabled(basePath: Path, jsc: JavaSparkContext, parameters: Map[String, String]): Boolean = {
+ val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+ if (hiveSyncEnabled) {
+ log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+ val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+ syncHive(basePath, fs, parameters)
+ } else {
+ true
+ }
+ }
+
private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
val hiveConf: HiveConf = new HiveConf()
@@ -279,16 +329,9 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("Commit " + instantTime + " failed!")
}
- val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
- val syncHiveSucess = if (hiveSyncEnabled) {
- log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
- val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
- syncHive(basePath, fs, parameters)
- } else {
- true
- }
+ val syncHiveSuccess = syncHiveIfEnabled(basePath, jsc, parameters)
client.close()
- commitSuccess && syncHiveSucess
+ commitSuccess && syncHiveSuccess
} else {
log.error(s"$operation failed with $errorCount errors :")
if (log.isTraceEnabled) {
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
new file mode 100644
index 0000000000000..05c66821859e8
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ skeletonReadFunction: PartitionedFile => Iterator[Any],
+ regularReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ skeletonSchema: StructType,
+ requiredColumns: Array[String],
+ tableState: HudiBootstrapTableState)
+ extends RDD[InternalRow](spark.sparkContext, Nil) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ val bootstrapPartition = split.asInstanceOf[HudiBootstrapPartition]
+
+ if (bootstrapPartition.split.skeletonFile.isDefined) {
+ logInfo("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+ + bootstrapPartition.split.skeletonFile.get.filePath)
+ } else {
+ logInfo("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ + bootstrapPartition.split.dataFile.filePath)
+ }
+
+ var partitionedFileIterator: Iterator[InternalRow] = null
+
+ if (bootstrapPartition.split.skeletonFile.isDefined) {
+ val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+ val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+ partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
+ } else {
+ partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
+ }
+
+ partitionedFileIterator
+ }
+
+ def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ override def next(): InternalRow = {
+ mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
+ }
+ }
+ }
+
+ def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
+ val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
+ val dataArr = dataRow.copy().toSeq(dataSchema)
+ // We need to return it in the order requested
+ val mergedArr = requiredColumns.map(col => {
+ if (skeletonSchema.fieldNames.contains(col)) {
+ val idx = skeletonSchema.fieldIndex(col)
+ skeletonArr(idx)
+ } else {
+ val idx = dataSchema.fieldIndex(col)
+ dataArr(idx)
+ }
+ })
+
+ logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
+ val mergedRow = InternalRow.fromSeq(mergedArr)
+ mergedRow
+ }
+
+ def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
+ : Iterator[InternalRow] = {
+ val fileIterator = readFileFunction(partitionedFile)
+
+ import scala.collection.JavaConverters._
+
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ logInfo("Getting partitions..")
+
+ tableState.files.zipWithIndex.map(file => {
+ if (file._1.skeletonFile.isDefined) {
+ logInfo("Forming partition with => " + file._2 + "," + file._1.dataFile.filePath
+ + "," + file._1.skeletonFile.get.filePath)
+ HudiBootstrapPartition(file._2, file._1)
+ } else {
+ logInfo("Forming partition with => " + file._2 + "," + file._1.dataFile.filePath)
+ HudiBootstrapPartition(file._2, file._1)
+ }
+ }).toArray
+ }
+}
+
+case class HudiBootstrapPartition(index: Int, split: HudiBootstrapSplit) extends Partition
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
new file mode 100644
index 0000000000000..b7aa438129e66
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import scala.collection.JavaConverters._
+
+class HudiBootstrapRelation(@transient val _sqlContext: SQLContext,
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient,
+ val optParams: Map[String, String]) extends BaseRelation
+ with PrunedFilteredScan with Logging {
+
+ val fileIndex: HudiBootstrapFileIndex = buildFileIndex()
+
+ val skeletonSchema: StructType = StructType(Seq(
+ StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType, nullable = true)
+ ))
+
+ var dataSchema: StructType = _
+
+ var completeSchema: StructType = _
+
+ override def sqlContext: SQLContext = _sqlContext
+
+ override val needConversion: Boolean = false
+
+ override def schema: StructType = inferFullSchema()
+
+ /**
+ * Implementing PrunedScan to support column pruning, by reading only the required columns from the parquet files
+ * instead by passing them down to the ParquetFileFormat.
+ *
+ * TODO: To get better performance with Filters we should implement PrunedFilteredScan push filters down to the
+ * parquet files. But this is much more tricky to implement because then with filters being pushed down, unequal
+ * number od rows may be returned by external data reader, and skeleton file readers. Merging in this scenario
+ * will become much more complicated.
+ *
+ * @param requiredColumns This contains the columns user has passed in select() or filter() operations on the
+ * dataframe
+ * @return
+ */
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ logInfo("Starting scan..")
+ filters.foreach(filter => logInfo("Obtained filter: " + filter.references.mkString(",") + " "
+ + filter.getClass))
+
+ // Compute splits
+ val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
+ var skeletonFile: Option[PartitionedFile] = Option.empty
+ var dataFile: PartitionedFile = null
+
+ if (hoodieBaseFile.getExternalBaseFile.isPresent) {
+ skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
+ dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getExternalBaseFile.get().getPath, 0,
+ hoodieBaseFile.getExternalBaseFile.get().getFileLen)
+ } else {
+ dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
+ }
+ HudiBootstrapSplit(dataFile, skeletonFile)
+ })
+ val tableState = HudiBootstrapTableState(bootstrapSplits)
+
+ // Get required schemas for column pruning
+ val requiredDataSchema = StructType(dataSchema.filter(field => requiredColumns.contains(field.name)))
+ val requiredSkeletonSchema = StructType(skeletonSchema.filter(field => requiredColumns.contains(field.name)))
+ val requiredRegularSchema = StructType(requiredColumns.map(col => {
+ completeSchema.find(_.name == col).get
+ }))
+
+ // Prepare readers for reading data file and skeleton files
+ val dataReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = dataSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredDataSchema,
+ filters = Nil,
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ val skeletonReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = skeletonSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredSkeletonSchema,
+ filters = Nil,
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ val regularReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = completeSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredRegularSchema,
+ filters = filters,
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+
+ val rdd = new HudiBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
+ regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
+
+ logInfo("Number of partitions for HudiBootstrapRDD => " + rdd.partitions.length)
+ rdd.asInstanceOf[RDD[Row]]
+ }
+
+ def inferFullSchema(): StructType = {
+ if (completeSchema == null) {
+ logInfo("Inferring schema..")
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ completeSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
+ }
+ completeSchema
+ }
+
+ def buildFileIndex(): HudiBootstrapFileIndex = {
+ logInfo("Building file index..")
+ val inMemoryFileIndex = createInMemoryFileIndex(globPaths)
+ val fileStatuses = inMemoryFileIndex.allFiles()
+
+ if (fileStatuses.isEmpty) {
+ throw new RuntimeException("No files found for reading.")
+ }
+
+ val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
+ .filterCompletedInstants, fileStatuses.toArray)
+ val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+ latestFiles.foreach(file => logInfo("Skeleton file path: " + file.getPath))
+ latestFiles.filter(_.getExternalBaseFile.isPresent).foreach(file => {
+ logInfo("External data file path: " + file.getExternalBaseFile.get().getPath)
+ })
+
+ HudiBootstrapFileIndex(latestFiles)
+ }
+
+ private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
+ val fileStatusCache = FileStatusCache.getOrCreate(_sqlContext.sparkSession)
+ new InMemoryFileIndex(_sqlContext.sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
+ }
+}
+
+case class HudiBootstrapFileIndex(files: List[HoodieBaseFile])
+
+case class HudiBootstrapTableState(files: List[HudiBootstrapSplit])
+
+case class HudiBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 436895bda3499..e796900544e49 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,19 +17,24 @@
package org.apache.hudi
+import com.google.common.collect.Lists
+import org.apache.avro.Schema
import org.apache.hadoop.fs.GlobPattern
import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -47,6 +52,14 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
+ val skeletonSchema: StructType = StructType(Seq(
+ StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType, nullable = true),
+ StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType, nullable = true)
+ ))
+
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -71,13 +84,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
.getInstants.iterator().toList
- // use schema from latest metadata, if not present, read schema from the data file
- private val latestSchema = {
- val schemaUtil = new TableSchemaResolver(metaClient)
- val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
- AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ // use schema from a file produced in the latest instant
+ val latestSchema: StructType = {
+ log.info("Inferring schema..")
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ StructType(skeletonSchema.fields ++ dataSchema.fields)
}
+
private val filters = {
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.getOrElse(
@@ -92,36 +108,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = latestSchema
override def buildScan(): RDD[Row] = {
- val fileIdToFullPath = mutable.HashMap[String, String]()
+ val regularFileIdToFullPath = mutable.HashMap[String, String]()
+ var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
- fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+ if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
+ metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+ } else {
+ regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+ }
+ }
+
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ // filer out meta bootstrap files that have had more commits since metadata bootstrap
+ metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+ .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
}
+
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
- val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
- val globMatcher = new GlobPattern("*" + pathGlobPattern)
- fileIdToFullPath.filter(p => globMatcher.matches(p._2))
- } else {
- fileIdToFullPath
+ val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+ if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
+ val globMatcher = new GlobPattern("*" + pathGlobPattern)
+ (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+ metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+ } else {
+ (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
+ }
}
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredFullPath.isEmpty) {
+ if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
- filters.foldLeft(sqlContext.read.options(sOpts)
- .schema(latestSchema)
- .parquet(filteredFullPath.values.toList: _*)
- .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'",
- HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
- .toDF().rdd
+
+ var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
+
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ df = sqlContext.sparkSession.read
+ .format("hudi")
+ .schema(latestSchema)
+ .option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
+ .load()
+ }
+
+ if (regularFileIdToFullPath.nonEmpty)
+ {
+ df = df.union(sqlContext.read.options(sOpts)
+ .schema(latestSchema)
+ .parquet(filteredRegularFullPaths.toList: _*)
+ .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.head.getTimestamp))
+ .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.last.getTimestamp)))
+ }
+
+ filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}
}
}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 495e8b057897b..e8eefdcdd61ee 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -114,6 +114,11 @@ public void setUp() throws Exception {
srcPath = tmpFolder.toAbsolutePath().toString() + "/data";
+ // initialize parquet input format
+ reloadInputFormats();
+ }
+
+ private void reloadInputFormats() {
// initialize parquet input format
roInputFormat = new HoodieParquetInputFormat();
roJobConf = new JobConf(jsc.hadoopConfiguration());
@@ -165,7 +170,7 @@ public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
.withSchema(schema.toString())
.withBootstrapModeSelector(MetadataOnlyBootstrapModeSelector.class.getName()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, true, 1, timestamp,
timestamp, false);
@@ -183,7 +188,7 @@ public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -223,7 +228,7 @@ public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
.withBootstrapModeSelector(MetadataOnlyBootstrapModeSelector.class.getName())
.build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, true, 1, timestamp,
timestamp, false);
@@ -241,7 +246,7 @@ public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -282,7 +287,7 @@ public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
.withBootstrapModeSelector(MetadataOnlyBootstrapModeSelector.class.getName()).build();
System.out.println("Config Props :" + config.getProps().getProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()));
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, true, 1,
timestamp, timestamp, false);
// Rollback Bootstrap
@@ -299,7 +304,7 @@ public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -341,7 +346,7 @@ public void testFullBoostrapOnlyCOW() throws Exception {
.withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
.withBootstrapModeSelector(FullBootstrapModeSelector.class.getName()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, false, 1, timestamp,
timestamp, false);
// Rollback Bootstrap
@@ -358,7 +363,7 @@ public void testFullBoostrapOnlyCOW() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -395,7 +400,7 @@ public void testFullBootstrapWithUpdatesMOR() throws Exception {
.withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
.withBootstrapModeSelector(FullBootstrapModeSelector.class.getName()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, false, 1, timestamp,
timestamp, false);
// Rollback Bootstrap
@@ -412,7 +417,7 @@ public void testFullBootstrapWithUpdatesMOR() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -454,7 +459,7 @@ public void testMetaAndFullBoostrapCOW() throws Exception {
.withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
.withBootstrapModeSelector(TestRandomBootstapModeSelector.class.getName()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, false, 2, 2,
timestamp, timestamp, false,
Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS));
@@ -472,7 +477,7 @@ public void testMetaAndFullBoostrapCOW() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -509,7 +514,7 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
.withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
.withBootstrapModeSelector(TestRandomBootstapModeSelector.class.getName()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, false, 2, 2,
timestamp, timestamp, false,
Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS));
@@ -527,7 +532,7 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
// Run bootstrap again
client = new HoodieWriteClient(jsc, config);
- client.bootstrap();
+ client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -593,6 +598,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
}
// RO Input Format Read
+ reloadInputFormats();
List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
@@ -609,6 +615,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
assertEquals(totalRecords, seenKeys.size());
//RT Input Format Read
+ reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
@@ -752,10 +759,8 @@ public Map> select(List {
final BootstrapMode mode;
if (currIdx == 0) {
- System.out.println("METADATA bootstrap selected");
mode = BootstrapMode.METADATA_ONLY_BOOTSTRAP;
} else {
- System.out.println("FULL bootstrap selected");
mode = BootstrapMode.FULL_BOOTSTRAP;
}
currIdx = (currIdx + 1) % 2;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
index 4cdc01ece6468..7fc8afb548034 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
@@ -18,12 +18,12 @@
package org.apache.hudi.utilities.checkpointing;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieException;
-
+import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieException;
/**
* Provide the initial checkpoint for delta streamer.
@@ -51,7 +51,13 @@ public InitialCheckPointProvider(TypedProperties props) {
*
* @param config Hadoop configuration
*/
- public abstract void init(Configuration config) throws HoodieException;
+ public void init(Configuration config) throws HoodieException {
+ try {
+ this.fs = FileSystem.get(config);
+ } catch (IOException e) {
+ throw new HoodieException("CheckpointProvider initialization failed");
+ }
+ }
/**
* Get checkpoint string recognizable for delta streamer.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
new file mode 100644
index 0000000000000..17058da7fddc6
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used to set a checkpoint from latest commit of another (mirror) hudi dataset.
+ * Used by integration test.
+ */
+public class InitialCheckpointFromAnotherHoodieTimelineProvider extends InitialCheckPointProvider {
+
+ private HoodieTableMetaClient anotherDsHoodieMetaclient;
+
+ public InitialCheckpointFromAnotherHoodieTimelineProvider(TypedProperties props) {
+ super(props);
+ }
+
+ @Override
+ public void init(Configuration config) throws HoodieException {
+ super.init(config);
+ this.anotherDsHoodieMetaclient = new HoodieTableMetaClient(config, path.toString());
+ }
+
+ @Override
+ public String getCheckpoint() throws HoodieException {
+ return anotherDsHoodieMetaclient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
+ .map(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(anotherDsHoodieMetaclient.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return commitMetadata.getMetadata(CHECKPOINT_KEY);
+ } catch (IOException e) {
+ return null;
+ }
+ }).filter(Objects::nonNull).findFirst().get();
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
index 8e8af55a3c563..654836c2a68e3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
@@ -21,9 +21,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -44,15 +42,6 @@ public KafkaConnectHdfsProvider(TypedProperties props) {
super(props);
}
- @Override
- public void init(Configuration config) throws HoodieException {
- try {
- this.fs = FileSystem.get(config);
- } catch (IOException e) {
- throw new HoodieException("KafkaConnectHdfsProvider initialization failed");
- }
- }
-
/**
* PathFilter for Kafka-Connect-HDFS.
* Directory format: /partition1=xxx/partition2=xxx
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 8a8d6780f4246..c5f5e70b7c934 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -75,6 +75,8 @@
import scala.collection.JavaConversions;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -85,8 +87,6 @@ public class DeltaSync implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
- public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
- public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
/**
* Delta Sync Config.
@@ -260,7 +260,8 @@ private Pair>> readFromSource
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
- } else {
+ } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 725edd5e1d8dc..6f78e29a611f5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,7 +18,7 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hadoop.hive.conf.HiveConf;
+import java.util.HashMap;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.HoodieWriteClient;
@@ -55,6 +55,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -89,13 +90,14 @@ public class HoodieDeltaStreamer implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
- public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
+ public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
+ public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
private final transient Config cfg;
private final TypedProperties properties;
- private transient DeltaSyncService deltaSyncService;
+ private transient Option deltaSyncService;
private final Option bootstrapExecutor;
@@ -114,22 +116,27 @@ public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Con
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
- TypedProperties properties) throws IOException {
+ TypedProperties props) throws IOException {
+ // Resolving the properties first in a consistent way
+ this.properties = props != null ? props : UtilHelpers.readConfig(
+ FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
+ new Path(cfg.propsFilePath), cfg.configs).getConfig();
+
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
- UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, properties);
+ UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, this.properties);
checkPointProvider.init(conf);
cfg.checkpoint = checkPointProvider.getCheckpoint();
}
this.cfg = cfg;
- this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, conf, properties);
- this.properties = properties;
this.bootstrapExecutor = Option.ofNullable(
- cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, properties) : null);
+ cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null);
+ this.deltaSyncService = Option.ofNullable(
+ cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, this.properties));
}
public void shutdownGracefully() {
- deltaSyncService.shutdown(false);
+ deltaSyncService.ifPresent(ds -> ds.shutdown(false));
}
/**
@@ -143,18 +150,30 @@ public void sync() throws Exception {
bootstrapExecutor.get().execute();
} else {
if (cfg.continuousMode) {
- deltaSyncService.start(this::onDeltaSyncShutdown);
- deltaSyncService.waitForShutdown();
+ deltaSyncService.ifPresent(ds -> {
+ ds.start(this::onDeltaSyncShutdown);
+ try {
+ ds.waitForShutdown();
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
+ }
+ });
LOG.info("Delta Sync shutting down");
} else {
LOG.info("Delta Streamer running only single round");
try {
- deltaSyncService.getDeltaSync().syncOnce();
+ deltaSyncService.ifPresent(ds -> {
+ try {
+ ds.getDeltaSync().syncOnce();
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
+ }
+ });
} catch (Exception ex) {
LOG.error("Got error running delta sync once. Shutting down", ex);
throw ex;
} finally {
- deltaSyncService.close();
+ deltaSyncService.ifPresent(DeltaSyncService::close);
LOG.info("Shut down delta streamer");
}
}
@@ -167,7 +186,7 @@ public Config getConfig() {
private boolean onDeltaSyncShutdown(boolean error) {
LOG.info("DeltaSync shutdown. Closing write client. Error?" + error);
- deltaSyncService.close();
+ deltaSyncService.ifPresent(DeltaSyncService::close);
return true;
}
@@ -410,9 +429,7 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
- this.props = properties != null ? properties : UtilHelpers.readConfig(
- FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
- new Path(cfg.propsFilePath), cfg.configs).getConfig();
+ this.props = properties;
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
@@ -685,9 +702,7 @@ public BootstrapExecutor(Config cfg, JavaSparkContext jssc, FileSystem fs, Confi
this.jssc = jssc;
this.fs = fs;
this.configuration = conf;
- this.props = properties != null ? properties : UtilHelpers.readConfig(
- FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
- new Path(cfg.propsFilePath), cfg.configs).getConfig();
+ this.props = properties;
// Add more defaults if full bootstrap requested
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL());
@@ -713,8 +728,18 @@ public BootstrapExecutor(Config cfg, JavaSparkContext jssc, FileSystem fs, Confi
public void execute() throws IOException {
initializeTable();
HoodieWriteClient bootstrapClient = new HoodieWriteClient(jssc, bootstrapConfig, true);
- bootstrapClient.bootstrap();
- syncHive();
+
+ try {
+ HashMap checkpointCommitMetadata = new HashMap<>();
+ checkpointCommitMetadata.put(CHECKPOINT_KEY, cfg.checkpoint);
+ if (cfg.checkpoint != null) {
+ checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
+ }
+ bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
+ syncHive();
+ } finally {
+ bootstrapClient.close();
+ }
}
/**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
index e8718558ccfe0..176661b0fe356 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
@@ -18,6 +18,9 @@
package org.apache.hudi.utilities.keygen;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -62,6 +65,8 @@ enum TimestampType implements Serializable {
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
private final TimeZone timeZone;
+ protected final boolean encodePartitionPath;
+
/**
* Supported configs.
*/
@@ -108,6 +113,9 @@ public TimestampBasedKeyGenerator(TypedProperties config) {
default:
timeUnit = null;
}
+
+ this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
+ Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
}
@Override
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index c759d0d0769a1..0da29359ea9a5 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -104,6 +104,12 @@
org.apache.hadoop.hbase.
org.apache.hudi.org.apache.hadoop.hbase.
+
+
+
+
+ org.apache.hadoop.hbase.util.VersionInfo
+
org.apache.htrace.