Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
41deec7
Fix timestamp_millis issue
linliu-code Jan 14, 2026
f966a51
Solve more compiling errors
linliu-code Jan 14, 2026
0f28b8c
Fix some bugs
linliu-code Jan 21, 2026
b4fe80a
Fix incremental queries for auto repair
linliu-code Jan 26, 2026
b269ce0
Fix data skipping support
linliu-code Jan 28, 2026
d039cdf
Pass schema from option instead of global configuration for thread sa…
linliu-code Jan 28, 2026
4ebe05e
Address partial comments
linliu-code Jan 28, 2026
4092037
Address more comments
linliu-code Jan 29, 2026
2436a84
changing code parts around HoodieStorage (introduced in 0.15.0.) to u…
Feb 5, 2026
8c7c8a5
Addressed comments
linliu-code Jan 30, 2026
297f657
address wiring comments
linliu-code Jan 30, 2026
48bcddb
Fix hive related tests
Feb 2, 2026
46616bf
Fix tests
Feb 3, 2026
f787b5f
Fix tests
Feb 4, 2026
10d552a
Fix tests
Feb 4, 2026
06c921d
Fix test and address review comments
Feb 4, 2026
a2e4585
Cherry pick bug fixes
Feb 5, 2026
b9e8286
Fix build
Feb 4, 2026
2781335
Fix build
Feb 5, 2026
e9acc81
Revert "Fix tests"
Feb 6, 2026
d538737
Revert "Fix build"
Feb 6, 2026
797e6e5
Fix build issues - 2.11 depending on 2.12 version
Feb 6, 2026
65242d6
Fix build issues - Column stream explicty collection.2
Feb 6, 2026
610d2ce
Fix build issues - Use projection schema instead of repairFileSchema
Feb 6, 2026
2ba229f
Fix build issues - remove spark3.5 profiles from github workflow bot
Feb 7, 2026
2a77ab8
Fix build issues - Fix IT test pom.xml update
Feb 7, 2026
8faab37
Fix build issues - Use projection schema fields from repaired schema
Feb 8, 2026
ffd982e
Fix build issues - throw exception only for spark 3.4 gt
Feb 8, 2026
3b93fb0
Fix build issues - throw exception only for spark 3.3
Feb 8, 2026
f512e6f
Fix spark33 bug
linliu-code Feb 9, 2026
b44fe3a
Address review comments
Mar 3, 2026
ac24e2a
Address review comments
Mar 4, 2026
483ba40
Fix compilation
Mar 6, 2026
0252875
[HUDI-5034] Remove Spark Avro schema post processor (#6956) (#17799)
linliu-code Jan 8, 2026
a060008
refactor: Update ParquetWriteSupport for Rows to match Avro writer be…
the-other-tim-brown Sep 13, 2025
af6bd33
Fix compilation and test failure
Mar 10, 2026
4cd9bf8
Fix compilation
Mar 10, 2026
48c13fc
Fix compilation errors
linliu-code Mar 15, 2026
f77f672
Fix CI failures
linliu-code Mar 15, 2026
14a05a6
Fix compilation error
linliu-code Mar 15, 2026
7d9e9b4
Fix bugs
linliu-code Mar 16, 2026
06ee9ee
Fix test failures
Mar 31, 2026
b3a45e0
Fix test failures
Apr 1, 2026
c4c0a35
[HUDI-7812] Disabling row writer for clustering (#11360)
nsivabalan May 29, 2024
1ec3577
Fix input source modification time
Apr 1, 2026
8f32825
Revert "Fix input source modification time"
Apr 1, 2026
71148c0
Fix tests
Apr 1, 2026
d88597d
Fix compilation
Apr 2, 2026
b3f6fde
Fix test failure
Apr 2, 2026
cab0ab1
Use supportBatch instead
Apr 3, 2026
6db2a16
Add optimisation for merged read handle
Apr 7, 2026
990ea48
Address review comments
Apr 8, 2026
8c9b54f
Addressing feedback and optimizing schema parsing
nsivabalan Apr 9, 2026
7595d6d
Fix compilation
Apr 9, 2026
a2ff34c
More fixes and test fix
Apr 9, 2026
bd54b98
Other fixes
Apr 9, 2026
3a41101
fix compilation
Apr 9, 2026
63a4781
Fix compilation
Apr 9, 2026
6e221eb
Fix compilation
Apr 9, 2026
7aec228
Fix test failures
Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 73 additions & 9 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,29 +177,93 @@ jobs:
java-version: '17'
distribution: 'adopt'
architecture: x64
cache: maven
- name: Verify Java 17 version
run: |
echo "JAVA_HOME: $JAVA_HOME"
java -version
which java
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- name: UT - Common & Spark
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- name: Java UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: FT - Spark
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Java FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
if: ${{ !endsWith(env.SPARK_PROFILE, '3.4') }} # skip test spark 3.4 as it's covered by Azure CI
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java17-scala-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.3"
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Verify Java 17 version
run: |
echo "JAVA_HOME: $JAVA_HOME"
java -version
which java
- name: Scala UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Scala FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-flink:
runs-on: ubuntu-latest
strategy:
Expand Down
6 changes: 3 additions & 3 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ parameters:
default:
- 'hudi-spark-datasource'
- 'hudi-spark-datasource/hudi-spark'
- 'hudi-spark-datasource/hudi-spark3.2.x'
- 'hudi-spark-datasource/hudi-spark3.4.x'
- 'hudi-spark-datasource/hudi-spark3.2plus-common'
- 'hudi-spark-datasource/hudi-spark3-common'
- 'hudi-spark-datasource/hudi-spark-common'
Expand All @@ -73,7 +73,7 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.18.x'
- '!hudi-spark-datasource'
- '!hudi-spark-datasource/hudi-spark'
- '!hudi-spark-datasource/hudi-spark3.2.x'
- '!hudi-spark-datasource/hudi-spark3.4.x'
- '!hudi-spark-datasource/hudi-spark3.2plus-common'
- '!hudi-spark-datasource/hudi-spark3-common'
- '!hudi-spark-datasource/hudi-spark-common'
Expand All @@ -97,7 +97,7 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.18.x'

variables:
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.18'
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.4 -Dflink1.18'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
MVN_OPTS_INSTALL: '-Phudi-platform-service -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS) -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=5'
MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
Expand Down
7 changes: 7 additions & 0 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@
</exclusions>
</dependency>

<!-- Validation API - override old 1.1.0 version from transitive dependencies -->
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>

<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down
46 changes: 46 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,48 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
<!-- Exclude Jetty from timeline-service to use our managed version -->
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Jetty: Explicitly declare all Jetty dependencies to ensure version alignment -->
<!-- This is critical when running in Spark/Hadoop environments that may have older Jetty versions -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-xml</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -188,6 +230,10 @@
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.index;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand Down Expand Up @@ -241,8 +242,11 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getHadoopConf(),
() -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema));
return partitionLocations.flatMap(p
-> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getPartitionPath(), p.getFileId()))
-> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getPartitionPath(), p.getFileId()), hasTimestampFields)
.getMergedRecords().iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -54,14 +55,16 @@ public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K
protected final Schema readerSchema;
protected final Schema baseFileReaderSchema;

public HoodieMergedReadHandle(HoodieWriteConfig config,
Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable, Pair<String, String> partitionPathFileIDPair,
boolean hasTimestampFields) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
this.baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
// Repair reader schema.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also be executed in executor and we can probably optimise by adding a flag in the caller. Could not validate though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets fix this. should not take much effort to fix.
lets fix 0.15.1 if need be.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed
#18478 for 0.15

// Assume writer schema should be correct. If not, no repair happens.
readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema;
}

public List<HoodieRecord<T>> getMergedRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
Expand All @@ -54,7 +55,7 @@
*/
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS, SCALAR
}

private final TimeUnit timeUnit;
Expand Down Expand Up @@ -93,6 +94,9 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
break;
case EPOCHMICROSECONDS:
timeUnit = MICROSECONDS;
break;
case UNIX_TIMESTAMP:
timeUnit = SECONDS;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -86,7 +87,12 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieFileReader bootstrapFileReader = null;

Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Schema readerSchema = baseFileReader.getSchema();
Schema readerSchema;
if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(hadoopConf, () -> true)) {
readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);
} else {
readerSchema = baseFileReader.getSchema();
}

// In case Advanced Schema Evolution is enabled we might need to rewrite currently
// persisted records to adhere to an evolved schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.compact;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
Expand Down Expand Up @@ -129,6 +130,12 @@ public HoodieData<WriteStatus> compact(
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader.
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);
// Since we are using merge handle here, we can directly query the write schema from conf
// Write handle provides an option to use overridden write schema as well which is not used by merge handle
Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
if (!table.isMetadataTable()) {
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getHadoopConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema));
}
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper))
.flatMap(List::iterator);
Expand Down
24 changes: 24 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -253,6 +257,26 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<id>add-spark32plus-parquet-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<skipAddSource>${spark31orEarlier}</skipAddSource>
<sources>
<source>src/parquet/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
Expand Down
Loading
Loading