[OSS PR #18599] feat(lance): Add VariantType support to Lance base files#30
Open
hudi-agent wants to merge 70 commits into
Open
[OSS PR #18599] feat(lance): Add VariantType support to Lance base files#30hudi-agent wants to merge 70 commits into
hudi-agent wants to merge 70 commits into
Conversation
…che#18508) * fix: whitelist Flink _2.12 artifacts in scala-2.13 enforcer rule - Flink only publishes flink-table-planner_2.12 (no _2.13 variant exists). The scala-2.13 profile's blanket ban on *_2.12 artifacts breaks the build when combining -Dspark4.0 -Dscala-2.13 with -Dflink1.20. - Add <includes> exceptions for org.apache.flink:*_2.12 and its transitive _2.12 dependencies (org.scala-lang.modules, com.twitter) since Flink has largely decoupled from Scala since 1.15 and the _2.12 suffix is internal. * fix: tighten Flink _2.12 whitelist and add rationale comment Address review feedback on apache#18508: - Narrow scala-lang.modules and com.twitter includes to the specific transitives (scala-xml_2.12, chill_2.12) so the enforcer still catches unexpected _2.12 leakage from those groups. - Add an XML comment explaining why these _2.12 artifacts are whitelisted despite the blanket scala-2.13 ban (Flink's _2.12 suffix is a legacy naming artifact post 1.15 Scala decoupling).
…int.sh (apache#18527) - The property was inserted into yarn-site.xml twice with the same value in the MULTIHOMED_NETWORK=1 block. - Duplicates are harmless at runtime (Hadoop's Configuration parser takes the last value for duplicates and both writes use the same value), but the second write is dead code. - Applies to base, base_java11, and base_java17.
- MapUtils.isNullOrEmpty(Map) was byte-identical to CollectionUtils.isNullOrEmpty(Map), and MapUtils.nonEmpty(Map) had no matching overload in CollectionUtils. - Move the sole remaining unique helper (containsAll) into CollectionUtils, delete MapUtils, and fold its test into TestCollectionUtils so there is one canonical utility class for Map/Collection emptiness and containment checks.
…bleList (apache#18530) - Replace Stream.of(elements).collect(Collectors.toList()) with a direct Arrays.asList copy. - Same semantics (mutable snapshot wrapped in an unmodifiable view), one less stream/spliterator/collector allocation per call. - Called from constants/initializers, so the saving is per-call but free.
…pache#18083) * Implemented a continuous sorting mode for the append sink to maintain sorted order incrementally and avoid single-partition lag during ingestion by reducing large pause time from sort and backpressure Summary: - Added AppendWriteFunctionWithContinuousSort which keeps records in a TreeMap keyed by a code-generated normalized key and an insertion sequence, drains oldest entries when a configurable threshold is reached, and writes drained records immediately; snapshot/endInput drain remaining records. - Updated AppendWriteFunctions.create to instantiate the continuous sorter when WRITE_BUFFER_SORT_CONTINUOUS_ENABLED is true. - Introduced three new FlinkOptions: WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_THRESHOLD_PERCENT, and WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, and added runtime validation (buffer > 0, 0 < threshold < 100, drainSize > 0, parsed non-empty sort keys). - Added ITTestAppendWriteFunctionWithContinuousSort integration tests covering buffer flush triggers, sorted output correctness (with and without continuous drain), drain threshold/size behaviors, and invalid-parameter error cases. --------- Co-authored-by: dsaisharath <dsaisharath@uber.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…apache#18204) - Added HudiHiveSyncJob under hudi-utilities as an external runner for Hive sync. - Added CLI/config support for base path, base file format, props file, and override configs. - Wired the job to build sync properties and invoke HiveSyncTool directly. --------- Co-authored-by: sivabalan <n.siva.b@gmail.com>
…() (apache#18461) * fix: close HoodieStorage resource in FileSystemBasedLockProvider.close() Adds a finally block to properly close the HoodieStorage instance after the lock file is deleted, preventing resource leaks. Fixes apache#14922 * fix: simplify log message per review feedback Addresses nit from @yihua: shorten log to "Failed to close HoodieStorage" since logger already includes class context. * fix: add comment explaining HoodieStorage.close() is no-op for HadoopStorage Addresses review feedback from danny0405: HoodieHadoopStorage.close() is a no-op because Hadoop FileSystem instances are shared within the JVM process lifecycle. Added a comment to explain this behavior and why the call is still retained for interface contract correctness.
…lters (apache#18531) * perf(common): Avoid double-iterating log files in file-system-view filters - filterUncommittedFiles and filterUncommittedLogs each called fileSlice.getLogFiles() twice: once to filter+collect into committedLogFiles, and again for .count() to compare sizes. - Each call produced a fresh stream over the underlying collection. - Materialize the log files once and compare against the resulting list size. - Also switch fileSlices.size() == 0 to isEmpty() in fetchAllLogsMergedFileSlice. - No behaviour changes. * perf(common): add FileSlice#getLogFileCnt and drop intermediate list - Address review comment on apache#18531: - The backing TreeSet's size() is O(1), so expose it as FileSlice#getLogFileCnt and use it for the size comparison in filterUncommittedFiles / filterUncommittedLogs. - This removes the allLogFiles materialization and keeps a single stream pass for the filter.
…pache#18488) * feat(vector): Add Spark SQL DDL CREATE TABLE support for VECTOR type - Enable VECTOR(dim[, elementType]) syntax in Spark SQL DDL so users can create tables with vector columns directly via SQL instead of only through the DataFrame API. - Changes: - Extend ANTLR grammar to accept identifier params in primitiveDataType - Add VECTOR case in visitPrimitiveDataType (supports FLOAT, DOUBLE, INT8) - Add VECTOR metadata attachment in addMetadataForType - Add DDL tests for VECTOR columns in TestCreateTable * Fix tests and address comments * Improve VECTOR DDL test coverage with targeted tests and routing - Relax isHoodieCommand VECTOR check from " vector(" to " vector" in all 4 extended parser files. - The stricter " vector(" variant only routes SQL containing VECTOR type declarations with parentheses (e.g. VECTOR(128)), which means VECTOR without parens is delegated to Spark's native parser and never reaches our Hudi code path. - Relaxing to " vector" routes all VECTOR-related SQL through our parser, enabling us to exercise the "vector with empty params" branch of the `case ("vector", _ :: _)` pattern - previously reported as partial coverage because the empty-list side of the `_ :: _` check was never hit. - This is also consistent with the existing BLOB routing pattern " blob". Add two targeted tests: 1. test create table with INT8 VECTOR column - isolated INT8 test that independently exercises the `case INT8 => ByteType` branch 2. test create table with VECTOR without dimension fails - routes VECTOR alone through the Hudi parser to cover the empty-list branch
* [MINOR] Bump lance to 4.0.0 and lance-spark to 0.4.0 Bumps lance-core from 1.0.2 to 4.0.0 and lance-spark connector from 0.0.15 to 0.4.0. Updates affected import paths and adapts to the LanceArrowUtils.toArrowSchema signature change (drops the errorOnDuplicatedFieldNames parameter). * [MINOR] Rename Hudi's ShowIndexes logical plan to HoodieShowIndexes Lance-spark 0.4.0 (bumped in 7e4967c) ships its own `org.apache.spark.sql.catalyst.plans.logical.ShowIndexes` inside `lance-spark-base_*.jar`. This collides with Hudi's own same-FQCN case class (added in hudi-spark-common). Both jars end up on the classpath of hudi-spark3.3.x/3.4.x/3.5.x/4.0.x, and since the two classes have different case-class arity (Lance's is 1-arg, Hudi's is 2-arg), Scala pattern matches like `case ShowIndexes(table, output)` fail to compile. Rename Hudi's class to `HoodieShowIndexes` (and its companion object) to sidestep the collision. This is an internal logical-plan class consumed only by Hudi's own parser / CatalystPlanUtils / analyzer — no public SQL or API surface changes. Call-sites updated: - Index.scala (definition + companion) - HoodieSpark{33,34,35,40}CatalystPlanUtils.scala (pattern match) - HoodieSpark{3_3,3_4,3_5,4_0}ExtendedSqlAstBuilder.scala (construct) - IndexCommands.scala (doc reference) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(lance): look up Arrow vectors by field name in LanceRecordIterator With lance-spark 0.4.0, VectorSchemaRoot.getFieldVectors() returns vectors in the file's on-disk order rather than in the order of the projection requested via LanceFileReader.readAll(). Wrapping vectors positionally therefore mismatches the UnsafeProjection built from the requested schema, causing UnsafeProjection to call type accessors on the wrong column (e.g. getInt on a VarCharVector) and fail with UnsupportedOperationException for MoR reads where the FileGroupRecord Buffer rearranges columns relative to the file's write order. Fix by looking up each vector by field name from the requested schema so the ColumnVector[] order matches what UnsafeProjection expects. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [MINOR] Tighten comments on LanceRecordIterator and HoodieShowIndexes Shorten the prose blocks above the column-order remapping in LanceRecordIterator and above HoodieShowIndexes to 2-3 sentences each, keeping the why (lance-spark 0.4.0 on-disk column order; FQCN shadow from lance-spark-base) without the full incident narrative. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…es (apache#18380) This PR adds support to block archival based on the Earliest Commit To Retain (ECTR) from the last completed clean operation, preventing potential data leaks when cleaning configurations change between clean and archival runs. Problem: Currently, archival recomputes ECTR independently based on cleaning configs at archival time, rather than reading it from the last clean plan. When cleaning configs change between clean and archival operations, archival may archive commits whose data files haven't been cleaned yet, leading to timeline metadata loss for existing data files. Summary and Changelog User-facing summary: Users can now optionally enable archival blocking based on ECTR from the last clean to prevent archiving commits whose data files haven't been cleaned. This is useful when cleaning configurations may change over time or when strict data retention guarantees are needed. Detailed changelog: Configuration Changes: Added new advanced config hoodie.archive.block.on.latest.clean.ectr (default: false) When enabled, archival reads ECTR from last completed clean metadata Blocks archival of commits with timestamp >= ECTR Marked as advanced config for power users Available since version 1.2.0 Implementation Changes: TimelineArchiverV1.java: Added ECTR blocking logic in getCommitInstantsToArchive() method Reads ECTR from last completed clean's metadata (lines 274-294) Filters commit timeline to exclude commits >= ECTR (lines 322-326) Follows same pattern as existing compaction/clustering retention checks Includes error handling with graceful degradation (logs warning if metadata read fails) HoodieArchivalConfig.java: Added config property BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR Builder method: withBlockArchivalOnCleanECTR(boolean) HoodieWriteConfig.java: Added access method shouldBlockArchivalOnCleanECTR()
* fix: prevent parseTypeDescriptor crash for non-custom logical types in schema conversion guards - The BLOB/VECTOR guard conditions in HoodieSparkSchemaConverters called parseTypeDescriptor() for any StructType/ArrayType with hudi_type metadata, which threw IllegalArgumentException for types like VARIANT that are not custom logical types. - Add isCustomLogicalTypeDescriptor() safe check to short-circuit the guards before parseTypeDescriptor() is called. - Add regression test that reproduces the struct+metadata VARIANT path. * fix: treat hudi_type=VARIANT as a first-class custom logical type - Address review feedback on apache#18510, restructure the crash fix so a StructType tagged with hudi_type=VARIANT is handled consistently with BLOB/VECTOR. - The hudi_type metadata is the deliberate escape hatch for engines without a native representation (notably Spark 3.5), so using it is itself as custom-logical-type signal. - Add VARIANT to CUSTOM_LOGICAL_TYPES and give it a case in parseTypeDescriptor, mirroring BLOB. - In HoodieSparkSchemaConverters, add a dedicated VARIANT pattern case that validates the expected unshredded structure ({metadata, value} binary fields) and produces HoodieSchema.Variant. - On Spark 4.0+ the column round-trips as native VariantType via the existing reverse conversion path. - Remove the isCustomLogicalTypeDescriptor short-circuit helper; with VARIANT now properly registered, the BLOB/VECTOR guards no longer need the pre-check. - Add unit tests for parseTypeDescriptor VARIANT (success, case insensitivity, parameter rejection) and integration tests asserting VARIANT promotion and malformed-struct rejection. * Address missing code coverage complains
…#18511) - Hive 2.x/3.x does not support VARIANT type natively. - When creating a Hudi table with VARIANT columns via SQL CREATE TABLE, Spark's HiveClient passes "variant" as a literal type string which Hive rejects. - Convert VariantType to struct<value:binary, metadata:binary> in the CatalogTable schema before passing to HiveClient, while preserving the original VariantType in table properties so Spark can reconstruct it when reading. - Includes unit test for the conversion. - Recursively convert VariantType inside nested StructType/ArrayType/MapType so columns like STRUCT<a:VARIANT>, ARRAY<VARIANT>, and MAP<STRING,VARIANT> are also rewritten to the Hive-compatible physical struct. - Emit the variant struct with canonical (metadata, value) field order to match HoodieSchema.createVariant() and the Parquet/Iceberg convention. - Extract buildHiveCompatibleCatalogTable helper so the schema conversion and property merge are directly unit-testable. - Expand TestVariantDataType with nested-variant cases, canonical-order assertions, and coverage for buildHiveCompatibleCatalogTable. - Clean up Scaladoc (use backticks) and rename the test field from v to variant_col.
…8448) Closes apache#18050 In multi-writer mode, when multiple writers detect a failed inflight commit, each writer independently schedules and executes its own rollback. This leads to duplicate rollback instants on the timeline for the same failed commit — causing unnecessary work, potential conflicts, and timeline clutter. Summary and Changelog Adds a mechanism to avoid duplicate rollback plans in multi-writer mode by: Timeline reload under lock: Before scheduling a new rollback, the writer reloads the active timeline inside the lock to check if another writer already scheduled a rollback for the same failed commit. If found, it reuses the existing plan instead of creating a new one. Heartbeat-based ownership: Before executing a rollback, the writer acquires a heartbeat for the rollback instant under a transaction lock. If another writer already holds an active heartbeat (i.e., is currently executing the rollback), the current writer skips execution. If the heartbeat is expired, the writer takes ownership and proceeds. Completed rollback detection: Inside the heartbeat acquisition lock, the writer also checks if the rollback was already completed on the timeline by another writer, and skips if so. Changes: BaseHoodieTableServiceClient: Refactored rollback() into schedule and execute phases. Extracted resolveOrScheduleRollback() (reuses existing pending rollbacks or schedules new ones under lock) and acquireRollbackHeartbeatIfMultiWriter() (heartbeat-based ownership with completed-rollback detection). Wrapped heartbeatClient.stop() in try-catch in the finally block to avoid masking rollback exceptions. HoodieWriteConfig: New advanced config hoodie.rollback.avoid.duplicate.plan (default false), gated on multi-writer mode via shouldAvoidDuplicateRollbackPlan(). TestClientRollback: Added 6 tests covering: expired heartbeat takeover, active heartbeat skip, commit-not-in-timeline, first-writer-schedules-new-plan, already-completed-by-another-writer, and concurrent two-writer rollback of the same commit. Impact New advanced config: hoodie.rollback.avoid.duplicate.plan — opt-in, default false, only effective in multi-writer mode. No breaking changes to public APIs or storage format. No behavioral change for single-writer mode or when the config is disabled. --------- Co-authored-by: Lokesh Jain <ljain@Lokeshs-MacBook-Pro.local> Co-authored-by: sivabalan <n.siva.b@gmail.com>
…he#18552) * fix: Parquet small-precision decimals decode ClassCastException Parquet encodes DECIMAL physically as INT32 (precision <= 9), INT64 (precision <= 18), or BINARY / FIXED_LEN_BYTE_ARRAY. The Hudi-Flink 1.18 ParquetDecimalVector predated this and unconditionally cast the child vector to BytesColumnVector inside getDecimal(), throwing ClassCastException whenever the reader materialized a small-precision decimal as a HeapIntVector / HeapLongVector. Sync ParquetDecimalVector with Apache Flink 2.1's implementation: - getDecimal dispatches on the physical child vector type (ParquetSchemaConverter#is32BitDecimal / is64BitDecimal) and decodes int / long / bytes accordingly. - Wrapper additionally implements WritableLongVector, WritableIntVector, and WritableBytesVector by delegation so that column readers can write through it without unwrapping. - Underlying vector field is now private; accessed via getVector(). Migrated ArrayColumnReader's nine direct-field accesses. - Adds TestParquetDecimalVector covering every dispatch path, the backward-compatible bytes-at-small-precision case, unsupported vector types, null handling, and the Writable* delegation contracts.
…le format (apache#18162) Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
… for RocksDBIndexBackend (apache#18560)
…ache#18545) Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
…n Spark 4 (apache#18564) Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
…che#18571) RECURSION_OVERFLOW_SCHEMA passes new byte[0] (via getUTF8Bytes("")) as the default value of a BYTES HoodieSchemaField, which wraps an Avro Schema.Field. Avro 1.12.0's Schema.validateDefault rejects byte[] for BYTES defaults — it now requires a String (interpreted as ISO-8859-1 bytes, Avro's canonical JSON form for BYTES defaults). Under 1.11.x the validator was lenient. Because the failure is in a static initializer, the JVM caches the ExceptionInInitializerError and every subsequent class access throws NoClassDefFoundError: Could not initialize class org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil$AvroSupport. Any downstream service loading this class (e.g. DeltaStreamer using ProtoClassBasedSchemaProvider) is permanently broken on JVMs that have Avro 1.12+ on the classpath, even though Hudi itself still pins 1.11.4. Fix: use "" (empty string) instead of getUTF8Bytes(""). The default value is never read at runtime — it is always overwritten by ByteBuffer.wrap(messageValue.toByteArray()) at line ~365 before the field is populated — and "" serializes bit-for-bit identically to new byte[0] on the wire (both produce "default":"" in the JSON schema), so behavior is strictly preserved under Avro 1.11 while satisfying 1.12's stricter validator. Rejected alternatives (experimental testing against both Avro versions): | default arg | 1.11.x | 1.12.0 | | new byte[0] (current) | OK | FAIL | | "" (fix) | OK | OK | | ByteBuffer.wrap(...) | OK | FAIL | | null | OK | OK, but strips the default entirely | Also drops the now-unused getUTF8Bytes static import. Fixes: apache#18569 Signed-off-by: tiennguyen-onehouse <tien@onehouse.ai>
apache#18570) HoodieFileGroupReaderBasedFileFormat.buildReaderWithPartitionValues builds two schemas side-by-side: requestedSchema (what to return to Spark) and dataSchema (what to read from parquet). It augments requestedSchema with any partition fields in mandatoryFields before pruning, but pipes dataStructType through unchanged. Spark's dataStructType excludes partition columns by convention, and HoodieSchemaUtils.pruneDataSchema iterates over its second arg, so any mandatory partition field is silently dropped from the resulting dataSchema. The FileGroupReader then does not read the column from the parquet base file, and for non-projection-compatible CUSTOM mergers (e.g. PostgresDebeziumAvroPayload) the output converter writes null for every affected row. Most visible on MOR file slices that have both a base file and a log file, since the readBaseFile path (which would append partition values from the directory name) is skipped in favor of the FileGroupReader path. Regression introduced by apache#13711 ("Improve Logical Type Handling on Col Stats"), which added the pruneDataSchema wrapping but only on the requested-schema side. Fix: mirror requestedStructType's construction — augment dataStructType with the mandatory partition fields before pruning. Also adds a regression test (TestFileGroupReaderPartitionColumn) that reproduces the scenario end-to-end: MOR + CustomKeyGenerator + PostgresDebeziumAvroPayload + GLOBAL_SIMPLE with update.partition.path=true, round-2 partition-key change producing a base+log slice, then verifies untouched records in that slice read back with the correct partition-column value. Fixes: apache#18568 Signed-off-by: tiennguyen-onehouse <tien@onehouse.ai>
…he#18379) This PR adds support for custom Parquet configuration injection across all file writer factories in Apache Hudi. This feature allows users to inject custom Parquet configurations (e.g., native Parquet bloom filters, custom compression settings, dictionary encoding overrides) at runtime without modifying Hudi's core code. Motivation: Users sometimes need to apply specific Parquet configurations for certain tables or partitions (e.g., disable dictionary encoding for high-cardinality columns, enable native Parquet bloom filters for specific columns, or apply custom encoding strategies). Previously, these configurations were hard-coded or required code changes. This PR introduces a pluggable mechanism via the HoodieParquetConfigInjector interface. Summary and Changelog Summary: Added support for custom Parquet configuration injection across Spark, Avro, and Flink file writers. Users can now implement the HoodieParquetConfigInjector interface and specify it via the hoodie.parquet.config.injector.class configuration to inject custom Parquet settings at write time. Changes: Core Implementation (hudi-client-common): - Added HoodieParquetConfigInjector interface with withProps() method that accepts StoragePath, StorageConfiguration, and HoodieConfig and returns modified configurations Spark Integration (hudi-spark-client): - Modified HoodieSparkFileWriterFactory.newParquetFileWriter() to check for and invoke config injector (lines 66-79) - Added comprehensive tests in TestHoodieParquetConfigInjector: testDisableDictionaryEncodingViaInjector() - validates dictionary encoding can be disabled testInvalidInjectorClassThrowsException() - validates error handling testNoInjectorUsesDefaultConfig() - validates backward compatibility - Tests validate actual Parquet metadata (encodings) rather than just configuration Avro Integration (hudi-hadoop-common): - Modified HoodieAvroFileWriterFactory.newParquetFileWriter() to support config injection (lines 71-85) - Updated getHoodieAvroWriteSupport() signature to accept StorageConfiguration - Added TestHoodieAvroParquetConfigInjector with similar test coverage Flink Integration (hudi-flink-client): - Modified HoodieRowDataFileWriterFactory.newParquetFileWriter() to support config injection (lines 126-140) - Added TestHoodieRowDataParquetConfigInjector with similar test coverage Configuration: - Added HOODIE_PARQUET_CONFIG_INJECTOR_CLASS config key in HoodieStorageConfig - Added withParquetConfigInjectorClass() builder method Impact Public API: New interface: HoodieParquetConfigInjector (marked with appropriate annotations) New configuration: hoodie.parquet.config.injector.class (optional, defaults to empty string) User-facing changes: Users can now customize Parquet settings per file/partition without modifying Hudi code Fully backward compatible - existing code continues to work without changes No performance impact when feature is not used (single isNullOrEmpty() check)
This PR adds support for creating empty clean commits to optimize clean planning performance for append-only datasets. Problem: In datasets with incremental cleaning enabled that receive infrequent updates or are primarily append-only, the clean planner performs a full table scan on every ingestion run because there are no clean plans to mark progress. This leads to significant performance overhead, especially for large tables. Solution: Introduce a new configuration hoodie.write.empty.clean.internval.hours that allows creating empty clean commits after a configurable duration. These empty clean commits update the earliestCommitToRetain value, enabling subsequent clean planning operations to only scan partitions modified after the last empty clean, avoiding expensive full table scans. Summary and Changelog User-facing changes: New advanced config hoodie.write.empty.clean.create.duration.ms (default: -1, disabled) to control when empty clean commits should be created When enabled with incremental cleaning, Hudi will create empty clean commits after the specified duration (in milliseconds) to optimize clean planning performance Detailed changes: Config Addition (HoodieCleanConfig.java): - Addedhoodie.write.empty.clean.internval.hours config property with builder method - Marked as advanced config for power users Clean Execution (CleanActionExecutor.java): - Modified clean parallelism calculation to ensure minimum of 1 (was causing issues with empty plans) - Added createEmptyCleanMetadata() method to construct metadata for empty cleans - Updated runClean() to handle empty clean stats by creating appropriate metadata Clean Planning (CleanPlanActionExecutor.java): - Added getEmptyCleanerPlan() method to construct cleaner plans with no files to delete - Modified requestClean() to return empty plans when partitions list is empty - Added logic in requestCleanInternal() to check if empty clean commit should be created based on: Incremental cleaning enabled Time since last clean > configured threshold Valid earliestInstantToRetain present Impact Performance Impact: Positive - significantly reduces clean planning time for append-only or infrequently updated datasets by avoiding full table scans API Changes: None - purely additive configuration Behavior Changes: When enabled, users will see empty clean commits in the timeline at the configured intervals These commits have totalFilesDeleted=0 and empty partition metadata but contain valid earliestCommitToRetain metadata
* Add KinesisSource
* Fix some issues
* Add tests
* Add more tests
* handle spark parallelism
* Add several critical features:
1. Support aggregated records.
2. Avoid expired shards blocking the stream.
* Add more tests for these corner cases
* Fix CI failures
* Fix a performance issue
* Filter empty shards before read`
* Readability
* Add more tests
* Add more tests and a bug fix
* Address some comments
* Address some comments
* Fixed some issues based on test in staging
* Address more comments
* iterator model for shard read
* Refctor
* test on data loss scenarios
* Add last arrival time to checkpoint
* Add smart retry
* Address comments
* refactor
* Fix config naming
* Fix CI OOM in test-common-and-other-modules build step
- Change -DskipTests=true to -Dmaven.test.skip=true to skip test
compilation during full-reactor build, avoiding OOM with new
Kinesis dependencies
- Add -Ddocker.skip=true to prevent hudi-aws DynamoDB Local container
from starting during the build-only step
* Fix Azure CI OOM in UT_FT_10 build step
Same fix as GitHub Actions: add -Dmaven.test.skip=true and
-Ddocker.skip=true to the full-reactor install in UT_FT_10 to
skip test compilation and Docker plugin during the build-only step.
* Fix docker skip property name: use -DskipDocker=true
The parent POM configures docker-maven-plugin with <skip>${skipDocker}</skip>,
so the correct property is -DskipDocker=true, not -Ddocker.skip=true.
* Reduce build parallelism in test-common-and-other-modules
Change -T 2 to -T 1 in the build step to avoid OOM during compilation
when multiple heavy modules (hudi-utilities, hudi-cli-bundle) compile
in parallel under the 4GB heap limit.
* Revert to -DskipTests with -T 1 for test-common build step
-Dmaven.test.skip=true breaks test-jar dependencies across modules.
Use -DskipTests=true (compiles tests, creates test-jars) with -T 1
(sequential build) to stay within 4GB heap. Keep -DskipDocker=true.
* Increase heap to 6g for test-common build step
4GB heap is insufficient for compiling hudi-utilities test sources
with new Kinesis dependencies. Increase to 6GB and restore -T 2
parallelism. GitHub Actions runners have 7GB RAM.
* Use -T 1 with 6g heap for test-common build step
6GB + -T 2 still OOMs during hudi-utilities testCompile when another
module compiles concurrently. Sequential build (-T 1) ensures
hudi-utilities gets the full 6GB heap for its test compilation.
* Fork compiler JVM for test-common build step
The main Maven JVM accumulates memory across 50+ module compilations,
causing OOM when it reaches hudi-utilities testCompile. Fork a separate
JVM for each module's compilation so it starts fresh with up to 4GB.
Restore -T 2 and -Xmx4g for the main Maven process.
* Disable 2 failing TestJsonKinesisSource tests
- testRecordToJsonInvalidJsonWithShouldAddOffsetsReturnsOriginalString:
expects fallback but code throws HoodieException for invalid JSON
- testCreateCheckpointLocalStackSentinelReplacedWithLastSeq:
sentinel replacement not yet implemented
Both flagged in PR review for follow-up fixes.
* Disable e2e Kinesis tests and revert CI config changes
- Revert bot.yml and azure-pipelines to match master
- Remove LocalStack/Testcontainers test infrastructure
(KinesisTestUtils, LocalStackJsonKinesisSource) that caused OOM
during full-reactor test compilation in CI
- Remove testcontainers:localstack and amazon-kinesis-aggregator
dependencies from hudi-utilities
- @disabled the TestKinesisSource nested class in TestHoodieDeltaStreamer
- Keep all unit tests (TestJsonKinesisSource, TestKinesisCheckpointUtils, etc.)
- E2e tests to be re-enabled in follow-up with proper CI support
* Remove e2e Kinesis tests and deaggregator test that depend on deleted files
Remove TestKinesisSource nested class from TestHoodieDeltaStreamer
(references deleted KinesisTestUtils/LocalStackJsonKinesisSource).
Remove TestKinesisDeaggregator (depends on removed amazon-kinesis-aggregator).
* Fork compiler in test-common build step to avoid OOM
The full-reactor build accumulates JVM memory across 50+ modules,
causing OOM when compiling hudi-utilities test sources with new
Kinesis dependencies. Fork a separate JVM per module compilation
(-Dmaven.compiler.fork=true -Dmaven.compiler.maxmem=4096m) so each
gets a fresh heap.
---------
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
…on-ingestion write commits (in case they have metadata rolled-over) (apache#18576) When archival removes all ingestion commits from the active timeline, code paths that infer schema or checkpoint metadata can fail because they only inspect ingestion-type instants (commits whose WriteOperationType.canUpdateSchema() is true). With Hudi's rolling metadata feature (hoodie.write.rolling.metadata.keys), non-ingestion commits like clustering, compaction, and delete_partition can carry rolled-over schema and checkpoint metadata. However, several inference paths don't search these commit types. This PR ensures schema and checkpoint resolution falls back to non-ingestion write commits when the latest instant doesn't carry the needed metadata. Summary and Changelog Changes: HoodieActiveTimeline / ActiveTimelineV1 / ActiveTimelineV2: Added a boolean filterByCanUpdateSchema overload to getLastCommitMetadataWithValidSchema. When false, the canUpdateSchema filter is skipped, allowing schema discovery from any commit type (clustering, compaction, delete_partition). The no-arg version retains the original behavior (filter enabled). TableSchemaResolver: Changed getLatestCommitMetadataWithValidSchema() to call getLastCommitMetadataWithValidSchema(false), so schema resolution searches all completed commit types instead of only ingestion commits. BaseHoodieClient: In mergeRollingMetadata, empty-string values are now treated as "missing" when checking both the current commit's existing metadata and values found in prior commits. This prevents an empty string from short-circuiting the walkback. InitialCheckpointFromAnotherHoodieTimelineProvider: Switched from getCommitsTimeline() to getWriteTimeline() to include compaction/logcompaction instants. Filters out empty checkpoint strings (not just nulls). Re-throws IOException as HoodieIOException instead of swallowing it. Tests: Added 2 unit tests in TestTimelineUtils (schema lookup ignoring operation type, empty schema returns empty) and 1 functional test in TestHoodieClientOnCopyOnWriteStorage (rolling metadata preserved across clustering after archival, with TableSchemaResolver still able to find schema). --------- Co-authored-by: Krishen Bhan <“bkrishen@uber.com”>
…NewSchema (apache#18580) Fixes issue: apache#18578 HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal switches on newSchema.getType() and only named RECORD/ENUM/ARRAY/MAP/UNION. BLOB (apache#18108) and VARIANT (apache#17833) are Hudi logical types physically stored as Avro records but exposed as distinct HoodieSchemaTypes, so a new schema typed BLOB/VARIANT fell through to rewritePrimaryType and threw "cannot support rewrite value for schema type". This reproduces on the Hive read path whenever Hive projects from its HMS-derived struct shape (record name = column name, type field = plain STRING) onto Hudi's canonical BLOB schema (record "blob", type = ENUM blob_storage_type, logicalType "blob") - the exact signature seen in ITTestCustomTypeHiveSync#testBlobTypeWithHiveSyncSQL. VECTOR was fine by accident because it maps to Avro FIXED. Add case BLOB and case VARIANT fallthrough to the existing RECORD body. Inner field layouts are fixed by BlobLogicalType.validate / VariantLogicalType.validate, so field-by-name iteration is correct. The existing ENUM case at line 137 already handles the STRING -> ENUM conversion for the BLOB "type" field. Tests pin the fix without Spark / Hive / Testcontainers - they call HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema directly with synthetic schemas that mirror the E2E failure signature, for both BLOB and VARIANT.
…ange walk (apache#18619) amazon-kinesis-deaggregator (added in apache#18224) pulls aws-lambda-java-events 1.1.0, whose POM declares aws-java-sdk-* deps with soft ranges like [1.10.5,). Maven resolves these by walking every published patch version, producing hundreds of POM downloads per clean build. Importing aws-java-sdk-bom in dependencyManagement overrides the ranges with a single deterministic version, eliminating the walk.
…atalogTable (apache#18654) Spark's HiveExternalCatalog.alterTable / createTable rejects table properties whose keys start with "spark.sql." with: AnalysisException: Cannot persist <table> into Hive metastore as table property keys may not start with 'spark.sql.': [spark.sql.create.version, spark.sql.sources.provider, spark.sql.sources.schema.partCol.0, spark.sql.sources.schema.numParts, spark.sql.sources.schema.numPartCols, spark.sql.sources.schema.part.0] These keys are reserved for Spark's internal use (provider, schema parts, create version) and Spark itself writes them when persisting a CatalogTable. On the way back through getTable they appear in the parameters map, and toCatalogTable currently passes them straight through. The next alter_table call then trips the validation and the entire HoodieHiveSyncClient flow fails - no actual sync happens. Strip "spark.sql.*" keys in toCatalogTable before constructing the CatalogTable. Spark re-derives and writes them from the CatalogTable, so dropping them on the way in is safe. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
HoodieHiveSyncClient invokes IMetaStoreClient.setMetaConf at construction time to forward hive.metastore.callerContext.* properties to the metastore for audit/tracing. With Spark's external catalog there is no remote HMS to receive those values, so throwing UnsupportedOperationException unconditionally breaks every sync client that uses SparkCatalogMetaStoreClient (it fails before any actual catalog operation can run). Accept the call silently. The caller-context properties are diagnostic metadata; dropping them is the correct semantic for a non-thrift catalog backend. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…8542) Addresses apache#18523. Shrinks the Java 17 integration-test image from ~3.56 GB to ~2.58 GB (~27%) without changing any runtime behavior. The container commands, environment variables, exposed ports, and entrypoint are identical. base_java17/Dockerfile: - switch base image from eclipse-temurin:17-jdk to 17-jre-jammy. The container only runs Hadoop; no Java compilation happens inside it, so the JDK toolchain is not needed. - convert to a multi-stage build. Stage 1 downloads and extracts the Hadoop tarball; stage 2 only COPYs the extracted tree. curl, ca-certificates, and the tar.gz no longer land in the final layer. - use --no-install-recommends and clean apt lists in the runtime stage. - drop the unused .asc signature download and the now-dead wget dep. spark_base/Dockerfile: - replace the Python-3.10.14-from-source build (which pulled in build-essential and a full compile toolchain, then built CPython with --enable-optimizations inside the image) with the distro python3-minimal + python3-pip packages. PySpark only needs a Python runtime at runtime.
…pache#18659) * feat: Introduce a Spark procedure to trigger LSM timeline compaction * fixup: address review on RunTimelineCompactionProcedure - Acquire the txn state-change lock around compactAndClean so the procedure cannot race with a concurrent archival/compaction over the LSM timeline manifest - Use TimeUnit.NANOSECONDS.toMillis for self-documenting unit conversion
…pache#18584) * [MINOR] Add date logical type test to TestAvroConversionUtils Add a test case that verifies createConverterToRow correctly handles Avro's date logical type (int with logicalType=date), ensuring the conversion from epoch-days integer to java.sql.Date preserves the correct date value. * [MINOR] Address review comments: rename convertor to converter, add comments --------- Co-authored-by: gallu <gallu@uber.com>
…paction tasks (apache#18675) * Honor SparkSession overrides for rebase mode and timezone in compaction tasks When MOR compaction runs outside a Spark SQL execution context (e.g. a standalone CompactTask runner), `SQLConf.get` on the executor task thread returns a fresh fallback `SQLConf` with default values, not the user's SparkSession overrides. As a result, `Spark{3_3,3_4,3_5,4_0}Adapter .getDateTimeRebaseMode()` resolved to `EXCEPTION` even when the user had set `spark.sql.parquet.datetimeRebaseModeInWrite=LEGACY`, producing `SparkUpgradeException [INCONSISTENT_BEHAVIOR_CROSS_VERSION .WRITE_ANCIENT_DATETIME]` during compaction of MOR tables containing pre-1900 timestamps. The same gap affected `HoodieRowParquetWriteSupport.init()`'s `sessionLocalTimeZone` read. Adapter and WriteSupport now resolve the value in this order: 1. SQLConf override (so `spark.conf.set(...)` on the SparkSession takes effect on the driver and inside SQL execution contexts). 2. SparkConf via SparkEnv.get.conf (broadcast to every executor at startup, so user-set keys are honored on executor tasks running outside a SQL execution context). 3. The ConfigEntry's own default (or SQLConf.sessionLocalTimeZone for the timezone helper). Adds TestSparkAdapterRebaseModePropagation (3 methods) covering rebase mode and timezone propagation into vanilla parallelize().map() task closures. Each test fails without the fix. * Apply fix to Spark4_1Adapter; use flatMap+Option to avoid null inside Option * Use SQLConf.getConf(entry, null) instead of getConfString(key, null) * Make Spark4_1 consistent with 3.x/4.0; add default-behavior test; trim scaladocs * Add unit test for resolveSessionLocalTimeZone in hudi-spark-client * Add SQLConf-override test method to lift coverage on new code * Drop redundant public modifiers from JUnit 5 test class and methods * Read expected default from SQLConf so test works on Spark 3.x and 4.1 * Document why init() coverage lives in hudi-spark integration tests * Inline single-use Parquet metadata keys; keep only timeZone constant * Add SparkConf-branch test for resolveSessionLocalTimeZone
Increase MAVEN_OPTS heap from 4g to 8g and compiler maxmem from 4096m to 8192m in GitHub Actions bot.yml. Also add -Xmx8g to the Azure Pipelines bundle validation build step. Co-authored-by: hudi-agent <277184175+hudi-agent@users.noreply.github.com>
When rolling metadata is configured (hoodie.write.rolling.metadata.keys), important metadata like schema and checkpoint keys are carried forward across commits. However, clean instants do not participate in this rolling mechanism, they neither receive rolled-over metadata nor serve as a source for subsequent lookups. After archival removes old ingestion commits, if only clean instants remain on the active timeline between surviving commits, the chain of rolled-over metadata can break. This PR ensures that clean commits also carry rolled-over metadata in their extraMetadata field, preserving the rolling metadata chain across archival. --------- Co-authored-by: Krishen Bhan <“bkrishen@uber.com”>
…he#18489) This PR moves the checkpoint metadata lookup helper into hudi-common so ingestion-related code can reuse the same timeline utility instead of keeping the logic in utilities-only code.
…gTable (apache#18672) Hudi's `HMSDDLExecutor.createTable` sets both `tableType=EXTERNAL_TABLE` and `parameters[EXTERNAL]=TRUE` on the Hive Table object when the table is external. When that Table flows through `SparkCatalogMetaStoreClient` into `HiveExternalCatalog`, `verifyTableProperties` rejects: AnalysisException: Cannot set or change the preserved property key: 'EXTERNAL' Spark uses `CatalogTableType.EXTERNAL` on the `CatalogTable` itself to encode external-ness, and treats `EXTERNAL=...` as a duplicate (and forbidden) encoding. We already map `tableType` correctly via `if ("EXTERNAL_TABLE".equalsIgnoreCase(table.getTableType))`, so dropping the property in the same filter that already strips `spark.sql.*` is safe. Same family as apache#18654 (filter `spark.sql.*`). Adds a regression test mirroring the real `HMSDDLExecutor` shape: `tableType=EXTERNAL_TABLE` AND `parameters[EXTERNAL]=TRUE`. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…64 timestamp dispatch (FLINK-35702) (apache#18636) * feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64 timestamp dispatch (FLINK-35702) * Minor fixes
…pache#18581) Fixes issue: apache#18577 When Hive's FetchOperator pushes nested column projection (e.g. `SELECT blob_data.reference.external_path`) through Parquet via `hive.io.file.readNestedColumn.paths`, the reader returns a compacted ArrayWritable holding only the projected sub-fields in low slots, while oldSchema stays the full 3-field canonical BLOB (BlobLogicalType.validate rejects partial field lists; pruneDataSchema deliberately preserves the canonical shape). Positional indexing into the compacted array AIOBEs, and even with a bounds guard, Hive's ObjectInspector downstream expects projected values at their canonical positions - the rewrite must remap, not just survive. Introduce a projection-aware rewrite path: - HoodieProjectionMask (new) - immutable per-level descriptor of physical layout. isCanonicalAtThisLevel() means schema positions apply; otherwise physicalIndexOf / physicalOrder map field names to physical slots. - HoodieColumnProjectionUtils.buildNestedProjectionMask() - parses hive.io.file.readNestedColumn.paths, walks RECORD / BLOB / VARIANT, returns the matching mask (or all() when projection is absent). - HiveHoodieReaderContext threads the mask into a new 5-arg rewriteRecordWithNewSchema overload. - HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal branches on the mask: - rewriteCanonicalRecord - legacy positional logic with a defensive oldField.pos() < arrayLength guard. - rewriteCompactedRecord - iterates physicalOrder() and writes each projected slot at its canonical position so the downstream ObjectInspector finds fields where it expects them. The compacted path is the primary fix; the canonical-path bounds guard is a defensive fallback. Tests: TestHoodieColumnProjectionUtils covers mask construction; TestHoodieArrayWritableSchemaUtils covers the AIOBE reproducer, compacted round-trip, and a canonical-shape regression. HoodieSchemaTestUtils gains createPlainBlobRecord and createPlainVariantRecord helpers (variant helper for upcoming VARIANT parity).
…che#18583) * test(schema): Add MOR log-only compaction tests for custom types Cover the invariant that the HoodieSchema.TYPE_METADATA_FIELD descriptor and payload shape of a custom-typed column survive inline compaction of a log-only MOR table into a base file. - TestVectorDataSource: add testMorLogOnlyCompactionPreservesVectorMetadata (5 commits via SQL + MERGE INTO to trigger default inline compaction). - TestVariantDataType: equivalent VARIANT test, gated on Spark 4.0+, asserting native VariantType round-trips through compaction. - TestBlobDataType (new): BLOB INLINE and BLOB OUT_OF_LINE cases. Inline uses named_struct with hex byte literals; out-of-line creates real files via BlobTestHelpers.createTestFile and verifies bytes via read_blob(). * test(schema): Address review comments on MOR log-only compaction tests - Pin hoodie.compact.inline.max.delta.commits = '5' on all 4 tables so compaction triggers deterministically rather than via the implicit default - Rename path to externalPath in outOfLineBlobLiteral - Fail with the missing id in embeddingOf instead of a bare .get - Extract val tablePath in the variant test for consistency
… path (FLINK-35702) (apache#18700)
lance-spark's LanceArrowUtils rejects VariantType as UNSUPPORTED_DATATYPE, and Spark's final ColumnVector.getVariant reads child(0)=value, child(1)=metadata positionally - so a parquet-variant-spec struct (metadata first) deserializes swapped and fails with MALFORMED_VARIANT. Write side (HoodieSparkLanceWriter): rewrite top-level VariantType to Struct[metadata: binary, value: binary] tagged hudi_type=VARIANT, and project each InternalRow via SparkAdapter.createVariantValueWriter. Spark 3.x stays a no-op via sparkAdapter.isVariantType. Read side (LanceVariantColumnVector + LanceRecordIterator): wrap the on-disk struct so getChild(0) returns the value column and getChild(1) the metadata column. The .lance file keeps spec-compliant ordering; positional getVariant reconstructs a correct VariantVal. Spark 4.0/4.1 split: Spark4VariantProjectedRow is now abstract in hudi-spark4-common, with concrete Spark40VariantProjectedRow and Spark41VariantProjectedRow subclasses in the version-specific modules. This is required because Spark 4.1's SpecializedGetters adds abstract getGeometry/getGeography methods (returning GeometryVal/GeographyVal, types that don't exist in Spark 4.0), so the shared module can't implement them. createVariantProjectedRow moves from BaseSpark4Adapter to Spark4_0Adapter / Spark4_1Adapter, mirroring the existing createInternalRow split for HoodieInternalRow. Scope: top-level variant only; nested variants in Struct/Array/Map is a follow-up.
|
Important Review skippedToo many files! This PR contains 300 files, which is 150 over the limit of 150. ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (300)
You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Mirror of apache#18599 for automated bot review.
Original author: @voonhous
Base branch: master