From 3dd15e05a0ddd58d9630d526f365cee605a67aa4 Mon Sep 17 00:00:00 2001 From: voon Date: Fri, 20 Mar 2026 01:13:02 +0800 Subject: [PATCH 1/6] feat(blob): Add RFC-100 external blob cleaner design --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 749 ++++++++++++++++++++ rfc/rfc-100/rfc-100-blob-cleaner-problem.md | 677 ++++++++++++++++++ 2 files changed, 1426 insertions(+) create mode 100644 rfc/rfc-100/rfc-100-blob-cleaner-design.md create mode 100644 rfc/rfc-100/rfc-100-blob-cleaner-problem.md diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md new file mode 100644 index 0000000000000..85404644af084 --- /dev/null +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -0,0 +1,749 @@ + + +# RFC-100 Part 2: Blob Cleanup for Unstructured Data + +## Proposers + +- @voon + +## Approvers + +- (TBD) + +## Status + +Issue: + +> Please keep the status updated in `rfc/README.md`. + +--- + +## Abstract + +When Hudi cleans expired file slices, out-of-line blob files they reference may become orphaned -- +still consuming storage but unreachable by any query. This RFC extends the existing file slice +cleaner to identify and delete these orphaned blob files safely and efficiently. The design uses a +three-stage pipeline: (1) per-file-group set-difference to find locally-orphaned blobs, (2) an MDT +secondary index lookup for cross-file-group verification of externally-referenced blobs, and (3) +container file lifecycle resolution. For Hudi-created blobs, cleanup is essentially free -- structural +path uniqueness eliminates cross-file-group concerns entirely. For user-provided external blobs, +targeted index lookups scale with the number of candidates, not the table size. Tables without blob +columns pay zero cost. + +--- + +## Background + +### Why Blob Cleanup Is Needed + +RFC-100 introduces out-of-line blob storage for unstructured data (images, video, documents). A +record's `BlobReference` field points to an external blob file by `(path, offset, length)`. When +the cleaner expires old file slices, the blob files they reference may no longer be needed -- but the +existing cleaner has no concept of transitive references. It deletes file slices without considering +the blob files they point to. Without blob cleanup, orphaned blobs accumulate indefinitely. + +### Two Blob Flows + +Blob cleanup must support two distinct entry flows with fundamentally different properties: + +**Flow 1 -- Hudi-created blobs.** Blobs created by Hudi's write path, stored at +`{table}/.hoodie/blobs/{partition}/{col}/{instant}/{blob_id}`. The commit instant in the path +guarantees uniqueness (C11), and blobs are scoped to a single file group (P3). Cross-file-group +sharing does not occur. This is the expected majority flow for Phase 3 workloads. + +**Flow 2 -- User-provided external blobs.** Users have existing blob files in external storage +(e.g., `s3://media-bucket/videos/`). Records reference these blobs directly by path. Hudi manages +the *references*, not the *storage layout*. Cross-file-group sharing is common -- multiple records +across different file groups can point to the same blob. This is the expected primary flow for +Phase 1 workloads. + +| Property | Flow 1 (Hudi-created) | Flow 2 (External) | +|---------------------------|-----------------------------------|--------------------------------------| +| Path uniqueness | Guaranteed (instant in path, C11) | Not guaranteed (user controls) | +| Cross-FG sharing | Does not occur (FG-scoped) | Common (multiple records, same blob) | +| Writer/cleaner race | Cannot occur (D2) | Can occur (D3) | +| Per-FG cleanup sufficient | Yes | No -- cross-FG verification needed | + +### Constraints and Requirements Reference + +Full descriptions and failure modes in [Appendix B](rfc-100-blob-cleaner-problem.md). + +| ID | Constraint | Flow 1 | Flow 2 | Remarks | +|-----|-------------------------------------------------|--------|--------|------------------------------| +| C1 | Blob immutability (append-once, read-many) | Y | Y | | +| C2 | Delete-and-re-add same path | -- | Y | Eliminated for Flow 1 by C11 | +| C3 | Cross-file-group blob sharing | -- | Y | Common for external blobs | +| C4 | Container files (`(offset, length)` ranges) | Y | Y | | +| C5 | MOR log updates shadow base file blob refs | Y | Y | | +| C6 | Existing cleaner is per-file-group scoped | Y | Y | | +| C7 | OCC is per-file-group | Y | Y | No global contention allowed | +| C8 | Clustering moves blob refs between file groups | Y | Y | | +| C9 | Savepoints freeze file slices and blob refs | Y | Y | | +| C10 | Rollback can invalidate or resurrect references | Y | Y | | +| C11 | Blob paths include commit instant | Y | -- | Eliminates C2, C3, C13 | +| C12 | Archival removes commit metadata | Y | Y | | +| C13 | Cross-FG verification needed at scale | -- | Y | | + +| ID | Requirement | +|-----|------------------------------------------------------------------| +| R1 | No premature deletion (hard invariant) | +| R2 | No permanent orphans (bounded cleanup) | +| R3 | Container awareness (range-level liveness) | +| R4 | MOR correctness (over-retention acceptable, under-retention not) | +| R5 | Concurrency safety (no global serialization) | +| R6 | Scale proportional to work, not table size | +| R7 | No cost for non-blob tables | +| R8 | All cleaning policies supported | +| R9 | Crash safety and idempotency | +| R10 | Observability (metrics for deleted, retained, reclaimed) | + +--- + +## Design Overview + +### Design Philosophy + +Blob cleanup extends the existing `CleanPlanner` / `CleanActionExecutor` pipeline -- same timeline +instant, same plan-execute-complete lifecycle, same crash recovery and OCC integration. A +`hasBlobColumns()` check gates all blob logic so non-blob tables pay zero cost. + +The two flows have different cost structures, and the design keeps them separate. Flow 1 +(Hudi-created blobs) gets per-FG cleanup with no cross-FG overhead. Flow 2 (external blobs) gets +targeted cross-FG verification via MDT secondary index. Dispatch is a string prefix check on the +blob path. + +### Three-Stage Pipeline + +| Stage | Scope | Purpose | When it runs | +|-------------|----------------------|----------------------------------------------------------------------------------|----------------------------------------| +| **Stage 1** | Per-file-group | Collect expired/retained blob refs, compute set difference, dispatch by category | Always (for blob tables) | +| **Stage 2** | Cross-file-group | Verify external blob candidates against MDT secondary index or fallback scan | Only when external candidates exist | +| **Stage 3** | Container resolution | Determine delete vs. flag-for-compaction at the container level | Only when container blobs are involved | + +### Independent Implementability + +The three stages have clean input/output interfaces and can be implemented, tested, and shipped +independently: + +| Stage | Input | Output | +|---------|---------------------------------------------------------|-----------------------------------------------------| +| Stage 1 | `FileGroupCleanResult` (expired + retained slices) | `hudi_blob_deletes`, `external_candidates` | +| Stage 2 | `external_candidates`, `cleaned_fg_ids` | `external_deletes` | +| Stage 3 | `hudi_blob_deletes` + `external_deletes`, retained refs | `blob_files_to_delete`, `containers_for_compaction` | + +A shared foundation layer must land first (see [Rollout / Adoption Plan](#rollout--adoption-plan)), after which stages +can proceed in any order. + +### Key Decisions + +| Decision | Choice | Rationale | +|---------------------|---------------------------------------------------------|--------------------------------------------------------------------| +| Blob identity | `(path, offset, length)` tuple | Handles containers (C4) and path reuse (C2) correctly | +| Cleanup scope | Per-FG (Hudi blobs) + MDT index lookup (external blobs) | Aligns with OCC (C7) and existing cleaner (C6); scales for C13 | +| Dispatch mechanism | Path prefix check on blob path | Zero-cost classification; Hudi blobs match `.hoodie/blobs/` prefix | +| Cross-FG mechanism | MDT secondary index on `reference.external_path` | Short-circuits on first non-cleaned FG ref; first-class for Flow 2 | +| Write-path overhead | None (Flow 1); MDT index maintenance (Flow 2) | Index maintained by existing MDT pipeline, not a new write cost | +| MOR strategy | Over-retain (union of base + log refs) | Safe (C5, R4); cleaned after compaction | +| Container strategy | Tuple-level tracking; delete only when all ranges dead | Correct (C4, R3); partial containers flagged for blob compaction | + +```mermaid +flowchart LR + subgraph Planning["CleanPlanActionExecutor.requestClean()"] + direction TB + Gate{"hasBlobColumns()?"} + Gate -- No --> Skip["Skip blob cleanup
(zero cost)"] + Gate -- Yes --> CP + + subgraph CP["CleanPlanner (per-partition, per-FG)"] + direction TB + Policy["Policy method
→ FileGroupCleanResult
(expired + retained slices)"] + S1["Stage 1
Per-FG blob ref
set difference + dispatch"] + Policy --> S1 + end + + S1 --> S2["Stage 2
Cross-FG verification
(MDT secondary index)"] + S1 -->|hudi_blob_deletes| S3 + S2 -->|external_deletes| S3["Stage 3
Container lifecycle
resolution"] + end + + subgraph Plan["HoodieCleanerPlan"] + FP["filePathsToBeDeleted
(existing)"] + BP["blobFilesToDelete
(new)"] + CC["containersToCompact
(new)"] + end + + S3 --> BP + S3 --> CC + CP --> FP + + subgraph Execution["CleanActionExecutor.runClean()"] + direction TB + DF["Delete file slices
(existing, parallel)"] + DB["Delete blob files
(new, parallel)"] + RC["Record containers
for blob compaction"] + end + + FP --> DF + BP --> DB + CC --> RC +``` + +--- + +## Algorithm + +### Stage 1: Per-File-Group Local Cleanup + +Stage 1 runs after the existing policy logic determines which file slices are expired and retained +for a given file group. It collects blob refs from both sets and computes locally-orphaned blobs by +set difference. + +``` +Input: A file group FG with expired_slices and retained_slices (from policy) +Output: hudi_blob_deletes -- blobs safe to delete immediately + external_candidates -- external blobs needing cross-FG verification + +for each file_group being cleaned: + + // Collect expired blob refs (base files + log files) + // Must read log files: blob refs introduced and superseded within the log + // chain before compaction would otherwise become permanent orphans. + expired_refs = Set<(path, offset, length)>() + for slice in expired_slices: + for ref in extractBlobRefs(slice.baseFile): // columnar projection + if ref.type == OUT_OF_LINE and ref.managed == true: + expired_refs.add((ref.path, ref.offset, ref.length)) + for ref in extractBlobRefs(slice.logFiles): // full record read + if ref.type == OUT_OF_LINE and ref.managed == true: + expired_refs.add((ref.path, ref.offset, ref.length)) + + if expired_refs is empty: + continue // no blob work for this FG + + // Collect retained blob refs (base files only) + // Cleaning is fenced on compaction: retained base files contain the merged + // state. Log reads are unnecessary -- any shadowed base ref causes safe + // over-retention, cleaned after the next compaction cycle. + retained_refs = Set<(path, offset, length)>() + for slice in retained_slices: + for ref in extractBlobRefs(slice.baseFile): // columnar projection only + if ref.type == OUT_OF_LINE and ref.managed == true: + retained_refs.add((ref.path, ref.offset, ref.length)) + + // Compute local orphans by set difference + local_orphans = expired_refs - retained_refs + + // Dispatch by blob category + for ref in local_orphans: + if ref.path starts with TABLE_PATH + "/.hoodie/blobs/": + hudi_blob_deletes.add(ref) // P3: no cross-FG refs possible + else: + external_candidates.add(ref) // C13: cross-FG refs are common +``` + +**Correctness notes:** + +- **Hudi-created blobs:** If a blob ref appears in expired but not retained slices of the same FG, + it is globally orphaned -- Hudi blobs are FG-scoped (C11), so no cross-FG check is needed. +- **MOR -- expired side reads base + logs:** Blob refs can be introduced and superseded entirely + within the log chain (e.g., `log@t2: row1→blob_B`, then `log@t3: row1→blob_C`). After + compaction, `blob_B` exists only in the expired log. Skipping logs would orphan it permanently. +- **MOR -- retained side reads base only:** Cleaning is fenced on compaction, so retained base + files contain the merged state. Shadowed base refs cause over-retention (safe), cleaned after + the next compaction. +- **Savepoints:** Inherited from existing cleaner -- savepointed slices stay in the retained set. +- **Replaced FGs (clustering):** `retained_slices` is empty, so all blob refs become candidates. + Hudi blobs are safe to delete (clustering creates new blobs in the target FG). External blobs + flow to Stage 2 (clustering copies the pointer, so Stage 2 finds it in the target FG). + +### Stage 2: Cross-FG Verification (External Blobs) + +Stage 2 executes only when `external_candidates` is non-empty. For Flow 1 workloads (Hudi-created +blobs only), this stage is skipped entirely. + +#### Primary path: MDT secondary index + +When the MDT secondary index on `reference.external_path` is available and fully built: + +``` +Input: external_candidates, cleaned_fg_ids +Output: external_deletes (confirmed globally orphaned) + +candidate_paths = external_candidates.map(ref -> ref.path).distinct() + +// Step 1: Batched prefix scan on secondary index +// Key format: escaped(external_path)$escaped(record_key) +// Returns ALL record keys that reference each candidate path +path_to_record_keys = mdtMetadata.readSecondaryIndexDataTableRecordKeysWithKeys( + HoodieListData.eager(candidate_paths), indexPartitionName) + .groupBy(pair -> pair.getKey()) + +// Step 2: Batch record index lookup -- ONE call for ALL record keys +// Sorts keys internally, single sequential forward-scan through HFile. +all_record_keys = path_to_record_keys.values().flatMap() +all_locations = mdtMetadata.readRecordIndexLocations( + HoodieListData.eager(all_record_keys)) // -> Map + +// Step 3: In-memory resolution with short-circuit per candidate +for path in candidate_paths: + record_keys = path_to_record_keys.getOrDefault(path, []) + + if record_keys is empty: + external_deletes.addAll(candidates for this path) // globally orphaned + continue + + found_live_reference = false + for rk in record_keys: + location = all_locations.get(rk) + if location != null and location.fileId NOT in cleaned_fg_ids: + found_live_reference = true + break // short-circuit (in-memory) + + if not found_live_reference: + external_deletes.addAll(candidates for this path) // all refs in cleaned FGs +``` + +**Cost model.** Three steps: (1) batched prefix scan on secondary index, (2) batched record index +lookup in a single sorted HFile scan, (3) in-memory resolution with short-circuit. Steps 1 and 2 +are each a single I/O pass; step 3 is pure hash set lookups. + +| Step | I/O | Estimated cost (2K candidates) | +|---------------------------|-----------------------------------------------|--------------------------------| +| 1. Prefix scan (batched) | 1 HFile open + forward scan of N prefix keys | ~2-5s | +| 2. Record index (batched) | 1 HFile open + forward scan of 6K sorted keys | ~1-2s | +| 3. In-memory resolution | Hash set checks (cleaned_fg_ids) | ~0ms | + +*Estimates assume cloud object storage (S3/GCS/ADLS), ~10-100ms per-read latency, ~50-200 MB/s +sequential throughput, 64-256KB HFile blocks. Step 1: a 250K-entry secondary index is ~50MB; a +sorted scan for 2K prefixes reads ~200-500 blocks, dominated by HFile open (~100-200ms) plus +sequential block reads with cold-cache latency overhead. Step 2: similar profile, fewer blocks +hit relative to index size. Pending benchmarking.* + +**Index definition.** Uses the existing `HoodieIndexDefinition` mechanism with +`sourceFields = ["", "reference", "external_path"]`. The nested field path is supported +by `HoodieSchemaUtils.projectSchema()` and `SecondaryIndexRecordGenerationUtils`. No new index +infrastructure is needed. + +**Safety check.** The cleaner verifies the index is fully built before using it via +`getMetadataPartitions()` and `getMetadataPartitionsInflight()`. A partially-built index falls +back to the table scan path. + +#### Fallback path: table scan with circuit breaker + +When the MDT secondary index is unavailable, Stage 2 falls back to a parallelized table scan +across all partitions. A circuit breaker (`hoodie.cleaner.blob.external.scan.max.candidates`, +default 1000) defers cleanup if candidates exceed the threshold, preventing the scan from becoming +a bottleneck on large tables. The operator is warned to enable the MDT secondary index. + +#### Decision matrix + +| Condition | Path used | Cost | Suitable for | +|-----------------------------|---------------|-----------------------|--------------------------------| +| No external candidates | Skip Stage 2 | Zero | Flow 1 workloads | +| MDT secondary index enabled | Index lookup | O(candidates) | Flow 2 at any scale | +| No index, few candidates | Table scan | O(candidates * table) | Small tables, few shared blobs | +| No index, many candidates | Circuit break | Zero (deferred) | Large tables -- index required | + +```mermaid +sequenceDiagram + participant C as Cleaner (Stage 2) + participant SI as MDT Secondary Index + participant RI as MDT Record Index + + Note over C: Step 1: Batch prefix scan + C->>SI: All candidate paths (N paths, single call) + SI-->>C: Map> + + Note over C: Step 2: Batch record index lookup + C->>C: Collect all record keys from all candidates + C->>RI: readRecordIndexLocations(all record keys) + Note over RI: Sort keys → single sequential
forward-scan through HFile + RI-->>C: Map + + Note over C: Step 3: In-memory resolution + loop For each candidate path + alt No record keys for this path + Note right of C: Globally orphaned → DELETE + else Has record keys + C->>C: Check each location.fileId
against cleaned_fg_ids (in-memory) + alt Any fileId NOT in cleaned_fg_ids + Note right of C: Live reference → RETAIN + else All in cleaned FGs + Note right of C: Globally orphaned → DELETE + end + end + end +``` + +### Stage 3: Container File Lifecycle + +Container files pack multiple blobs at different `(offset, length)` ranges. A container can only be +deleted when *all* ranges within it are unreferenced. + +``` +all_deletes = hudi_blob_deletes + external_deletes + +// Non-container blobs: each occupies an entire file -> delete directly +for ref in all_deletes where ref.offset is null: + blob_files_to_delete.add(ref.path) + +// Container blobs: group by path, check for remaining live ranges +for each container_path in container_range_deletes grouped by path: + dead_ranges = ranges being deleted for this container + live_ranges = retained refs (from Stage 1) + referenced refs (from Stage 2) + + if live_ranges is empty: + blob_files_to_delete.add(container_path) // entire container dead + else: + containers_for_compaction.add(container_path, dead_ranges) // partial -> repack +``` + +For Hudi-created containers, all ranges belong to the same FG (instant in path scopes it to a +single write), so retained refs from Stage 1 are sufficient -- no cross-FG check needed. + +### Execution Flow + +``` +1. CleanPlanActionExecutor.requestClean() + ├── hasBlobColumns(table)? // R7: zero-cost gate + ├── CleanPlanner: for each partition, for each file group: + │ ├── Refactored policy method -> FileGroupCleanResult + │ └── If hasBlobColumns: Stage 1 per FG + ├── CleanPlanner: replaced file groups -> Stage 1 + ├── If external_candidates non-empty: Stage 2 + ├── Stage 3: container lifecycle resolution + ├── Build HoodieCleanerPlan (+ blobFilesToDelete, containersToCompact) + └── Persist plan to timeline (REQUESTED state) + +2. CleanActionExecutor.runClean() + ├── Transition to INFLIGHT + ├── Delete file slices (existing, parallelized) + ├── Delete blob files (new, same parallelized pattern) + ├── Record containers for blob compaction (metadata only) + ├── Build HoodieCleanMetadata with blobCleanStats + └── Transition to COMPLETED +``` + +```mermaid +sequenceDiagram + participant P as CleanPlanActionExecutor + participant TL as Timeline + participant E as CleanActionExecutor + participant S as Storage + + Note over P: requestClean() + + P->>P: Stage 1: per-FG blob ref set difference + P->>P: Stage 2: MDT index lookup (if external candidates) + P->>P: Stage 3: container lifecycle resolution + P->>TL: Persist HoodieCleanerPlan + + Note over TL: REQUESTED + + rect rgb(255, 245, 230) + Note right of TL: Crash here → restart fresh
(no plan persisted yet) + end + + E->>TL: Transition plan state + Note over TL: INFLIGHT + + rect rgb(255, 245, 230) + Note right of TL: Crash here → re-execute plan
(idempotent: FileNotFound = success) + end + + par Parallel deletion + E->>S: Delete file slices (existing) + and + E->>S: Delete blob files (new) + end + + E->>E: Build HoodieCleanMetadata
(+ blobCleanStats) + E->>TL: Transition plan state + + rect rgb(255, 245, 230) + Note right of TL: Crash here → re-execute
(all deletes are no-ops) + end + + Note over TL: COMPLETED +``` + +--- + +## Integration with Existing Cleaner + +### CleanPlanner Refactoring + +The existing `CleanPlanner` policy methods produce `CleanFileInfo` objects (file paths to delete) +without exposing the expired/retained slice partition that blob cleanup needs. We introduce a new +return type: + +```java +public class FileGroupCleanResult { + private final List filePathsToDelete; + private final List expiredSlices; + private final List retainedSlices; +} +``` + +The three policy methods (`getFilesToCleanKeepingLatestVersions`, +`getFilesToCleanKeepingLatestCommits`, `getFilesToCleanKeepingLatestHours`) are refactored to +collect both expired and retained slices alongside the existing `CleanFileInfo` production. The +existing behavior is unchanged -- the refactoring adds output without modifying the +expired/retained classification logic. + +### Replaced File Group Handling + +Replaced file groups (from clustering) are cleaned via `getReplacedFilesEligibleToClean()`. A +parallel method `getReplacedFileGroupBlobCleanResults()` produces `FileGroupCleanResult` objects +with `retainedSlices = empty` and `expiredSlices = all slices`. This feeds into Stage 1 identically +to normal file groups. + +### Schema Changes: HoodieCleanerPlan + +Two new nullable fields with null defaults (backward compatible): + +- **`blobFilesToDelete`**: `List` -- blob files where all ranges are dead. + The executor deletes these. +- **`containersToCompact`**: `List` -- containers with mixed live/dead + ranges, including the dead `(offset, length)` ranges. Handed to the blob compaction service. + +### Schema Changes: HoodieCleanMetadata + +A new nullable field `blobCleanStats` of type `HoodieBlobCleanStats`: + +- `totalBlobFilesDeleted`, `totalBlobFilesRetained`, `totalContainersFlaggedForCompaction` +- `totalBlobStorageReclaimed` +- `deletedBlobFilePaths`, `failedBlobFilePaths` + +### hasBlobColumns() Gate + +An in-memory schema check (`TableSchemaResolver.getTableSchema().containsBlobType()`) gates all +blob cleanup logic. Requires making `containsBlobType()` public (one-line visibility change). + +--- + +## Concurrency & Safety + +### Writer-Cleaner Race: Two-Sided Guard + +**Hudi-created blobs (Flow 1): structurally impossible.** A new write creates new blob files with a +new instant in the path (C11). An UPSERT carries forward blob refs from retained slices (already +live). There is no mechanism for a writer to reference a Hudi-created blob that exists only in +expired slices. No writer-side check is needed for Flow 1. + +**External blobs (Flow 2): writer-side conflict check in `preCommit()`.** The gap between the +cleaner's planning-time snapshot and its actual file deletion is closed by a commit-time conflict +check: + +1. Writers track external managed blob paths in `HoodieWriteStat.externalBlobPaths` (in-memory + collection, no additional I/O). +2. At commit time (in `preCommit()`, under the existing transaction lock), the writer checks all + three clean states -- COMPLETED, INFLIGHT, and REQUESTED -- because a REQUESTED plan can begin + executing at any moment (the REQUESTED→INFLIGHT transition doesn't acquire the transaction + lock). It checks `deletedBlobFilePaths` (COMPLETED) and `blobFilesToDelete` (INFLIGHT/REQUESTED). +3. If any overlap is found, the commit is rejected with `HoodieWriteConflictException` and the + writer retries. + +Cost is zero for non-blob tables and Flow 1. For Flow 2: one timeline scan + 1-3 metadata reads. + +```mermaid +sequenceDiagram + participant W as Writer + participant TL as Timeline + participant CL as Cleaner + + Note over W,CL: Scenario A: Writer commits BEFORE cleaner plans + + W->>TL: Commit (references blob_X) + CL->>TL: Plan cleanup + Note right of CL: Sees blob_X in retained slice → not deleted + Note over W,CL: ✓ Safe + + Note over W,CL: Scenario B: Writer commits AFTER cleaner plans, BEFORE delete + + CL->>TL: Plan cleanup (blob_X in blobFilesToDelete) + Note over TL: REQUESTED / INFLIGHT + W->>TL: preCommit() -- reads clean plan + Note left of W: Intersection found!
HoodieWriteConflictException
→ Writer retries + Note over W,CL: ✓ Safe -- conflict detected + + Note over W,CL: Scenario C: Writer commits AFTER cleaner deletes, BEFORE COMPLETED + + CL->>CL: Delete blob_X from storage + Note over TL: Still INFLIGHT + W->>TL: preCommit() -- reads INFLIGHT plan + Note left of W: blob_X in blobFilesToDelete
→ Rejection + Note over W,CL: ✓ Safe -- same as B + + Note over W,CL: Scenario D: Cleaner completes, THEN writer acquires lock + + CL->>TL: Transition to COMPLETED + W->>TL: preCommit() -- reads COMPLETED metadata + Note left of W: blob_X in deletedBlobFilePaths
→ Rejection + Note over W,CL: ✓ Safe +``` + +### Concurrency Matrix + +| Operation | Concurrent with Blob Cleaner | Safety Mechanism | +|--------------------------------|------------------------------|-----------------------------------------------------------| +| Regular write (INSERT/UPSERT) | Safe | C11 path uniqueness (Flow 1); writer-side check (Flow 2) | +| Compaction | Safe | `isFileSliceNeededForPendingMajorOrMinorCompaction` | +| Clustering | Safe | Replaced FG lifecycle; Stage 2 for external blobs | +| Rollback | Safe | MOR over-retention; clean operates on post-rollback state | +| Savepoint create/delete | Safe | `isFileSliceExistInSavepointedFiles` | +| Archival | No interaction | Blob cleaner reads file slices, not commit metadata | +| Another cleaner instance | Safe | `TransactionManager`; `checkIfOtherWriterCommitted` | +| Blob compaction | Safe | Independent lifecycle | +| MDT writes (index maintenance) | Safe | MDT commit atomicity | + +### Crash Recovery + +Crash recovery is idempotent by construction, using the same mechanisms as existing file slice +cleaning: + +| Crash point | Recovery | +|-----------------------------------------|--------------------------------------------------------------------------------------------------------| +| During planning (before plan persisted) | No REQUESTED instant on timeline. Cleaner starts fresh. | +| After plan persisted, before execution | REQUESTED instant found; plan re-read and executed. | +| During execution (partial deletes) | INFLIGHT instant re-executed. Already-deleted files return FileNotFoundException → treated as success. | +| After execution, before COMPLETED | INFLIGHT re-executed. All deletes are no-ops. Metadata written, instant transitions to COMPLETED. | + +--- + +## Performance + +### Cost Summary + +| Workload | Stage 1 cost | Stage 2 cost | Total per cleanup cycle | +|--------------------------|---------------------------------|----------------------------|--------------------------------| +| Non-blob table | Zero (`hasBlobColumns` gate) | N/A | **Zero** | +| Flow 1 (Hudi-created) | ~6 Parquet reads per cleaned FG | Skipped | O(cleaned_FGs * slices_per_FG) | +| Flow 2 (external, index) | ~6 Parquet reads per cleaned FG | O(candidates) amortized | O(cleaned_FGs + candidates) | +| Flow 2 (external, scan) | ~6 Parquet reads per cleaned FG | O(candidates * table_size) | Circuit breaker limits this | + +### Back-of-Envelope: Example 7 (50K FGs, 2K External Candidates) + +| Parameter | Value | Notes | +|-------------------------------------|-----------|------------------------------------------------------| +| FGs cleaned this cycle | 500 | 1% of table | +| Stage 1: reads per FG | ~6 | 3 retained + 3 expired slices | +| Stage 1: total reads | 3,000 | Parallelized across executors, ~20s | +| External blob candidates | 2,000 | Locally orphaned in cleaned FGs | +| Avg refs per candidate | 3 | Random assumption | +| Total record keys | 6,000 | 2,000 * 3 | +| **Stage 2 cost (estimated)** | | | +| Step 1: batched prefix scan | 1 call | Returns 6K record keys, ~2-5s estimated | +| Step 2: batched record index lookup | 1 call | 6K keys sorted, single HFile scan, ~1-2s estimated | +| Step 3: in-memory resolution | 6K checks | Hash set lookups against cleaned_fg_ids, ~0ms | +| **Total Stage 2** | **~3-7s** | Estimated; see I/O assumptions in Stage 2 cost model | +| Comparison: naive full-table scan | 12.5TB | 50K FGs * 5 slices * 50MB = prohibitive | + +### Memory Budget + +Per-FG blob ref sets: ~100MB peak (500K records * 100 bytes/ref for expired + retained). FGs are +processed sequentially within each partition batch -- per-FG sets are computed and discarded, not +accumulated. Only the output lists (`hudi_blob_deletes`, `external_candidates`) grow, containing +only orphaned refs (much smaller). Peak heap: ~100MB * `cleanerParallelism` = 400MB-1.6GB. + +--- + +## Configuration + +| Property | Default | Description | +|----------------------------------------------------|---------|-----------------------------------------------------------------------------------| +| `hoodie.cleaner.blob.enabled` | `true` | Enable blob cleanup during clean action | +| `hoodie.cleaner.blob.dry.run` | `false` | Compute blob cleanup plan and log results but do not execute | +| `hoodie.cleaner.blob.external.scan.parallelism` | `10` | Parallelism for Stage 2 fallback table scan | +| `hoodie.cleaner.blob.external.scan.max.candidates` | `1000` | Circuit breaker for Stage 2 fallback scan; exceeding defers external blob cleanup | +| `hoodie.metadata.index.secondary.column` | (none) | Set to `.reference.external_path` for Flow 2 cross-FG verification | + +--- + +## Rollout / Adoption Plan + +Each stage can be implemented, tested, and shipped independently once the foundation layer is in +place (see [Independent Implementability](#independent-implementability)). + +**Foundation (shared prerequisite).** `CleanPlanner` refactoring (policy methods return +`FileGroupCleanResult`), `BlobRef` type, schema changes (nullable `blobFilesToDelete` and +`containersToCompact` fields), and the `hasBlobColumns` zero-cost gate. + +**Stage 1 (per-FG cleanup).** Set-difference logic and dispatch by blob category. Produces +`hudi_blob_deletes` (immediate) and `external_candidates` (for Stage 2). + +**Stage 2 (cross-FG verification) -- priority.** Flow 2 (external blobs) is the primary initial +use case -- cross-FG verification prevents premature deletion of shared blobs. Requires MDT + +record index + secondary index on `reference.external_path` (P6). Includes fallback table scan +with circuit breaker. + +**Stage 3 (container lifecycle).** Delete-entire-file vs. flag-for-compaction at the container +level. Needed only when container files are used. + +**Writer-side conflict check.** `preCommit()` conflict check for Flow 2 concurrency safety. +Closes the writer-cleaner race window. Independent of the three stages. + +### Backward Compatibility + +- All schema changes use nullable fields with null defaults. Existing clean plans and metadata + are unaffected. +- `hasBlobColumns()` gate ensures zero behavioral change for non-blob tables. +- One prerequisite code change: `HoodieSchema.containsBlobType()` visibility from package-private + to public (one-line change, no behavioral impact). + +--- + +## Test Plan + +### Unit Tests + +- **Stage 1 set-difference:** Verify correct orphan identification for COW and MOR file groups, + including MOR over-retention (shadowed base refs kept until post-compaction). +- **Stage 1 dispatch:** Verify Hudi-created blobs route to `hudi_blob_deletes` and external blobs + route to `external_candidates`. +- **Stage 2 index lookup:** Verify short-circuit behavior (stop after first live reference), empty + results (globally orphaned), and batched prefix scans. +- **Stage 2 fallback:** Verify table scan correctness and circuit breaker activation. +- **Stage 3 container lifecycle:** Verify delete-all-dead vs. flag-for-compaction decisions. +- **Writer-side conflict check:** Verify detection of conflicts with COMPLETED, INFLIGHT, and + REQUESTED clean actions. + +### Integration Tests + +- End-to-end clean cycle with Hudi-created blob table (COW and MOR). +- End-to-end clean cycle with external blob table and MDT secondary index. +- Clean cycle with replaced file groups (post-clustering). + +### Concurrency Tests + +- Writer-cleaner race scenarios A-D (from concurrency analysis) with external blobs. +- Concurrent clean + compaction with blob tables. + +### Backward Compatibility + +- Non-blob table clean cycle produces identical behavior (no `blobFilesToDelete`, no + `blobCleanStats`). +- Clean plan deserialization with and without blob fields (nullable field compatibility). + +--- + +## Appendix + +- **[Problem Statement, Constraints & Requirements](rfc-100-blob-cleaner-problem.md)** + -- Complete problem scope, all 13 constraints (C1-C13), all 10 requirements (R1-R10), 8 + illustrative failure mode examples, and open questions. diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-problem.md b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md new file mode 100644 index 0000000000000..61b2f79f5e16f --- /dev/null +++ b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md @@ -0,0 +1,677 @@ +# Blob Cleaner: Problem Statement + +## 1. Goal + +When old file slices are cleaned, out-of-line blob files they reference may become orphaned -- still +consuming storage but unreachable by any query. The blob cleaner must identify and delete these +unreferenced blob files without premature deletion (deleting a blob that is still referenced by a live +record). This document defines the problem scope, design constraints, requirements, and illustrative +failure modes. It contains no solution content. + +--- + +## 2. Scope + +### In scope + +- Cleanup of **out-of-line blob files** when references to them exist only in expired (cleaned) file + slices. +- All table types: **COW** and **MOR**. +- All cleaning policies: `KEEP_LATEST_COMMITS`, `KEEP_LATEST_FILE_VERSIONS`, + `KEEP_LATEST_BY_HOURS`. +- Interaction with table services: **compaction**, **clustering**, **blob compaction**. +- Interaction with timeline operations: **savepoints**, **rollback**, **archival**. +- Single-writer and multi-writer (OCC) concurrency modes. +- Both **Hudi-created blobs** (stored under `{table}/.hoodie/blobs/...`) and **user-provided + external blobs** (arbitrary paths). + +### Two entry flows + +Blob cleanup must support two distinct entry flows. These are not edge cases of each other -- +they are co-equal paths with different properties, different volumes, and different cleanup costs. + +**Flow 1: Path-dispatched (Hudi-created blobs).** Blobs created by Hudi's write path and stored +under `{table}/.hoodie/blobs/{partition}/{col}/{instant}/{blob_id}`. The path structure guarantees +uniqueness (C11), file-group scoping, and eliminates cross-FG sharing for normal writes. This is the +expected majority flow for Phase 3 workloads. + +**Flow 2: Non-path-dispatched (user-provided external blobs).** Users have existing blob files in +external storage (e.g., `s3://media-bucket/videos/`, a shared NFS mount, or any user-controlled +path). Records reference these blobs directly by path. The user does **not** want to bootstrap -- +they do not want Hudi to copy, move, or reorganize the blob files into `.hoodie/blobs/`. Hudi +manages the *references*, not the *storage layout*. This is the expected primary flow for Phase 1 +workloads and remains a supported flow in Phase 3. + +The non-path-dispatched flow has fundamentally different properties: + +| Property | Path-dispatched (Hudi-created) | Non-path-dispatched (external) | +|---------------------------|-----------------------------------|--------------------------------------| +| Path uniqueness | Guaranteed (instant in path, C11) | Not guaranteed (user controls) | +| Cross-FG sharing | Does not occur (FG-scoped) | Common (multiple records, same blob) | +| Writer/cleaner race | Cannot occur (D2) | Can occur (D3) | +| Delete-and-re-add (C2) | Eliminated | Real concern | +| Volume | Scales with writes | Can be large from day one | +| Per-FG cleanup sufficient | Yes | No -- cross-FG verification needed | + +Any solution that treats the non-path-dispatched flow as a rare edge case will fail at scale for +Phase 1 workloads. The cleanup algorithm must be efficient for **both** flows independently, and +must not impose the cost structure of one flow on the other. + +### Out of scope + +- **Inline blobs.** Inline blob data lives inside the base/log file and is deleted when the file + slice is cleaned. No additional cleanup needed. +- **Blob compaction internals.** Blob compaction (repacking partially-live container files) is a + separate service. This document defines the interface point (when to hand off to blob compaction) + but not its internal design. +- **Schema evolution.** Adding or removing blob columns does not change the cleanup problem. + +### Stance on the `managed` flag + +The BlobReference schema includes a `managed` boolean field +(`HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED`). The RFC states that only managed blobs are +cleaned. This document acknowledges the flag and treats it as a **filter** -- unmanaged blobs are +excluded from cleanup consideration. However, the cleanup design must be **correct regardless of the +flag's value**. The flag selects *which* blobs enter the cleanup pipeline; it must not be used as a +correctness lever within the pipeline itself. The flag may later serve as an optimization (skip +cleanup work for unmanaged blobs), but the problem statement and any solution must not depend on it +for safety. + +--- + +## 3. Background: Existing Cleaner + +The existing Hudi cleaner provides the execution framework that blob cleanup must integrate with. + +### Plan-execute model + +Cleaning is a two-phase operation: + +1. **Plan** (`CleanPlanner`): For each partition and file group, determine which file slices are + expired based on the cleaning policy. Produce a `HoodieCleanerPlan` listing file paths to delete. +2. **Execute** (`CleanActionExecutor`): Delete the files listed in the plan. Record results in + `HoodieCleanMetadata` on the timeline. + +### Per-partition, per-file-group iteration + +`CleanPlanner.getDeletePaths(partitionPath, earliestCommitToRetain)` iterates file groups within a +partition. For each file group, it compares file slices against the retention policy and produces a +list of `CleanFileInfo` objects (file paths to delete). The cleaner has no concept of cross-file-group +dependencies. + +### Savepoint awareness + +The cleaner collects all savepointed timestamps and their associated data files. File slices that +overlap with savepointed files are excluded from cleaning +(`isFileSliceExistInSavepointedFiles`). This preserves the savepoint invariant: a savepoint freezes a +consistent snapshot including all data files it references. + +### OCC conflict resolution + +`SimpleConcurrentFileWritesConflictResolutionStrategy` resolves write-write conflicts at the +`(partition, fileId)` granularity. There is no global serialization point. Concurrent writers to +different file groups proceed without contention. + +### Critical gap + +The existing cleaner operates on file paths (base files + log files) within a single file group. It +has **no concept of transitive references** -- it does not know that a file slice contains pointers +to external blob files that may need separate cleanup. Blob cleanup requires extending the cleaner +to follow these references and determine blob-level liveness. + +--- + +## 4. Design Constraints + +Each constraint is a fact about the Hudi system that any blob cleanup solution must respect. Violating +any constraint leads to data corruption, premature deletion, or permanent orphans. + +### C1: Blob immutability + +Once a blob file is written, its content never changes. Blob files are append-once, read-many. This +means a blob file's identity is stable for its entire lifetime. + +*Source: RFC-100 blob cleaner design, general storage semantics.* + +### C2: Delete-and-re-add same path + +A blob file can be deleted from storage and a new file created at the same path with different +content. This is a real concern for user-provided external blobs (the user controls the path). For +Hudi-created blobs, it is structurally eliminated by C11 (instant in path guarantees uniqueness). + +*Source: RFC-100 blob cleaner design; alternatives analysis constraint C2.* + +### C3: Cross-file-group blob sharing + +An out-of-line blob can be referenced by records in multiple file groups and multiple partitions. This +is explicitly supported for user-provided external blobs: two records in different file groups can +point to the same external file. For Hudi-created blobs, cross-FG sharing does not occur because the +blob is created within a specific file group's storage scope (see C11). However, after clustering +(C8), references to the same Hudi-created blob could temporarily exist in both the source and target +file groups until the source is cleaned. + +*Source: RFC-100 lines 196-198 (Option 1 scans all active file slices); alternatives analysis F6.* + +### C4: Container files + +Multiple blobs can be packed into a single container file, distinguished by `(offset, length)` within +the BlobReference. A container file can only be deleted when **all** byte ranges within it are +unreferenced. If some ranges are orphaned but others are still live, the container cannot be deleted -- +it must be handed off to blob compaction for repacking. + +*Source: BlobReference schema fields `offset` and `length`; RFC-100 lines 164-165 (container config); +alternatives analysis F1.* + +### C5: MOR log updates shadow base file blob refs + +In MOR tables, a log file update to a record's blob reference supersedes the base file's blob +reference for that record. The base file's blob ref appears live (it exists in an active file slice) +but is actually dead (the log update replaced it). Reading only the base file produces a **superset** +of live references. Over-retention (keeping the shadowed blob longer) is safe. Under-retention +(treating the log-shadowed base ref as already cleaned) would cause premature deletion if the log +update is later rolled back. + +*Source: RFC-100 line 122 (merge mode determines which blob reference is returned); MOR semantics.* + +### C6: Existing cleaner is per-file-group scoped + +`CleanPlanner` iterates per `HoodieFileGroup` within each partition. It determines expired file slices +within a single file group. There is no existing mechanism to evaluate cross-file-group dependencies +during cleaning. + +*Source: `CleanPlanner.getDeletePaths()`, `CleanPlanner.getFilesToCleanKeepingLatestCommits()`; +alternatives analysis F11.* + +### C7: OCC is per-file-group (no global contention allowed) + +Concurrent writer conflict resolution operates at `(partition, fileId)` granularity. Any solution that +introduces a global contention point (global counter, global lock, global bitmap) violates this +constraint and degrades write throughput under concurrency. + +*Source: `SimpleConcurrentFileWritesConflictResolutionStrategy`; alternatives analysis F12.* + +### C8: Clustering moves blob refs between file groups + +Clustering reads records from source file groups and rewrites them to target file groups. For +Hudi-managed blobs, clustering creates **new** blob files in the target file group. For external +blobs, clustering copies the pointer (same path, same offset/length) to the target file group. After +clustering, the source file group's slices still reference the original blobs until those slices are +cleaned. The target file group's slices reference either new blobs (Hudi-managed) or the same +external blobs. + +*Source: RFC-100 lines 212-214.* + +### C9: Savepoints freeze file slices and their blob refs + +A savepoint preserves a consistent snapshot. File slices covered by a savepoint are excluded from +cleaning. This means any blob referenced by a savepointed file slice must also be preserved, even if +the blob would otherwise be considered orphaned. The cleaner already handles savepoint exclusion for +file slices; blob cleanup must extend this guarantee to the blobs they reference. + +*Source: `CleanPlanner.savepointedTimestamps`, `isFileSliceExistInSavepointedFiles()`.* + +### C10: Rollback can invalidate or resurrect references + +Rolling back a commit can remove file slices that were the sole reference to a blob (the blob becomes +orphaned). Conversely, rolling back a commit that updated a record's blob reference can resurrect the +previous reference (an older blob that appeared orphaned is now live again). Any blob cleanup solution +must account for both directions. + +*Source: Hudi rollback semantics; timeline management.* + +### C11: Hudi-created blob paths include instant (structurally unique) + +Hudi-created blob files are stored at +`{table_path}/.hoodie/blobs/{partition}/{column_name}/{instant}/{blob_id}`. Because the commit +instant is embedded in the path, two different writes always produce different blob paths. This +eliminates the delete-and-re-add problem (C2) for Hudi-created blobs and means they are inherently +scoped to a single file group's write context. + +*Source: RFC-100 line 170; alternatives analysis F3.* + +### C12: Archival removes commit metadata from active timeline + +Hudi's archival process moves completed commits from the active timeline to the archived timeline. +If blob cleanup depends on information in commit metadata (e.g., which blobs were written by a +commit), that information becomes unavailable after archival unless it is persisted elsewhere. The +cleaner must either complete blob reference resolution before archival, or ensure the necessary +information survives archival. + +*Source: Hudi archival semantics; `HoodieActiveTimeline` vs `HoodieArchivedTimeline`.* + +### C13: Non-path-dispatched blobs require cross-FG verification at scale + +For the non-path-dispatched flow (Flow 2), cross-file-group blob sharing (C3) is the **common +case**, not an edge case. Users referencing external blobs (e.g., a shared media library) will +frequently have multiple records across different file groups and partitions pointing to the same +blob file. Any cleanup algorithm that treats cross-FG verification as a rare fallback will impose +disproportionate cost on Flow 2 workloads. The cross-FG verification path must be designed for +volume, not just correctness. + +*Source: Two entry flows (Section 2); C3; alternatives analysis D1, D3.* + +--- + +## 5. Requirements + +### R1: No premature deletion (hard invariant) + +A blob file must not be deleted while any live record still references it. This is the single most +critical requirement. A premature deletion causes silent data corruption: queries return null or error +for the affected records, and the data is unrecoverable. + +### R2: No permanent orphans (bounded cleanup) + +Every orphaned blob must eventually be cleaned. The number of cleanup cycles required to reclaim an +orphan must be bounded (e.g., cleaned within N cleaner invocations after the last referencing file +slice is expired). Unbounded accumulation of orphaned blobs wastes storage indefinitely. + +### R3: Container awareness (range-level liveness) + +Cleanup must track liveness at the `(path, offset, length)` tuple level, not just the path level. +A container file may have some live ranges and some dead ranges. Only when all ranges are dead can the +container file be deleted. Partially-dead containers should be flagged for blob compaction. + +### R4: MOR correctness + +For MOR tables, blob cleanup must be safe in the presence of log updates that shadow base file blob +references. Over-retention (keeping a shadowed blob until post-compaction) is acceptable. +Under-retention (prematurely deleting a blob whose reference appears shadowed but could be resurrected +by rollback) is not. + +### R5: Concurrency safety (no global serialization) + +Blob cleanup must not introduce global contention points. Write throughput for tables with blobs must +not degrade compared to tables without blobs under concurrent writers. Per-file-group scoping (C7) +must be preserved. + +### R6: Scale proportional to work, not table size + +For path-dispatched blobs (Flow 1): the cost of blob cleanup must be proportional to the number of +file groups being cleaned, not the total table size. A table with 100K file groups cleaning 1K of +them must not scan all 100K file groups. + +For non-path-dispatched blobs (Flow 2): cross-FG verification is required (C13), but the cost must +be proportional to the number of **candidate blobs requiring verification**, not the total number of +active file slices in the table. A table with 100K file groups where 50 external blob candidates +need cross-FG verification must not scan all 100K file groups -- it must use targeted lookups or +indexes to resolve those 50 candidates efficiently. + +### R7: No cost for non-blob tables + +Tables without blob columns must pay zero additional cost. The blob cleanup path must not be entered +if no blob columns exist. This includes no additional metadata, no additional timeline entries, and no +additional I/O. + +### R8: All cleaning policies supported + +Blob cleanup must work correctly under all three cleaning policies: `KEEP_LATEST_COMMITS`, +`KEEP_LATEST_FILE_VERSIONS`, and `KEEP_LATEST_BY_HOURS`. The blob cleanup logic should be +policy-agnostic -- it operates on the set of expired vs. retained file slices determined by the +policy, not on the policy itself. + +### R9: Crash safety and idempotency + +If the cleaner crashes after planning but before completing all deletions, restarting must be safe. +Blob deletions must be idempotent (deleting an already-deleted file is a no-op, not an error). +The cleaner plan must include enough information to resume blob cleanup without re-reading expired +file slices (which may no longer exist after a partial execution). + +### R10: Observability + +Blob cleanup must report metrics: number of blob files deleted, number of blob files retained +(over-retained due to MOR or containers), number of container files flagged for blob compaction, +and total storage reclaimed. These metrics enable operators to understand blob storage growth and +cleanup effectiveness. + +--- + +## 6. Illustrative Examples + +Each example demonstrates a specific failure mode. These are not exhaustive -- they are designed to +make the constraints and requirements concrete. + +### Example 1: Cross-file-group sharing -- per-FG cleanup deletes shared blob + +**Demonstrates:** C3, C6, R1 + +``` +Setup: + Partition P1, File Group FG-1: + Slice @t1: row1.blob_ref = (s3://shared/video.mp4, 0, 10MB, managed=true) + + Partition P2, File Group FG-2: + Slice @t1: row2.blob_ref = (s3://shared/video.mp4, 0, 10MB, managed=true) + +Action: + Cleaner expires FG-1's slice @t1 (no retained slices in FG-1). + +Per-FG cleanup (incorrect): + FG-1 expired refs = {(s3://shared/video.mp4, 0, 10MB)} + FG-1 retained refs = {} + Orphaned within FG-1 = {(s3://shared/video.mp4, 0, 10MB)} + -> DELETE s3://shared/video.mp4 + +Result: + FG-2 still has an active slice @t1 referencing video.mp4. + Query on row2 -> FILE NOT FOUND. Data corruption. + +Correct behavior: + Before deleting, verify that no other active file slice in any file group + references (s3://shared/video.mp4, 0, 10MB). FG-2's active slice references + it, so the blob must be retained. +``` + +### Example 2: Container file partial liveness -- deleting container destroys live ranges + +**Demonstrates:** C4, R3 + +``` +Setup: + File Group FG-1: + Slice @t1: row1.blob_ref = (container_A.bin, 0, 1MB, managed=true) + Slice @t2: row2.blob_ref = (container_A.bin, 1MB, 2MB, managed=true) + +Action: + Cleaner expires slice @t1, retains slice @t2. + +Path-level cleanup (incorrect): + Expired paths = {container_A.bin} + Retained paths = {container_A.bin} + container_A.bin is in both sets -> retain. + (This happens to be safe by accident, but only because the same path appears.) + + Alternative scenario -- different slices, same container: + Slice @t1 in FG-1: row1.blob_ref = (container_A.bin, 0, 1MB) -- expired + Slice @t1 in FG-2: row2.blob_ref = (container_A.bin, 1MB, 2MB) -- retained + + If FG-1 concludes container_A.bin is orphaned (no retained refs within FG-1): + -> DELETE container_A.bin + FG-2's live range at offset 1MB is destroyed. + +Correct behavior: + Track liveness at (path, offset, length). Only delete the container file when + ALL ranges are unreferenced. If some ranges are dead and others live, flag the + container for blob compaction instead of deleting it. +``` + +### Example 3: Delete-and-re-add -- path reuse causes identity confusion + +**Demonstrates:** C2, R1 + +``` +Setup: + At time t1: User writes row1.blob_ref = (s3://user/photo.jpg, managed=true) + At time t2: User deletes the file at s3://user/photo.jpg externally + At time t3: User writes row2.blob_ref = (s3://user/photo.jpg, managed=true) + (new file at same path, different content) + +Cleanup scenario: + Cleaner expires slice @t1. Slice @t3 is retained. + Expired refs = {s3://user/photo.jpg} + Retained refs = {s3://user/photo.jpg} + Same path in both sets -> retain. (Correct by coincidence.) + + But consider: if the cleaner had cached blob identity by path and assumed + "same path = same blob," it would not detect that the t1 and t3 references + point to different physical content. + + Edge case: if t3 is also expired and t1 is the only reference, the cleaner + would correctly delete. But if a new writer at t4 references the same path + AGAIN (third incarnation), the cleaner's identity model must not confuse the + three incarnations. + +Note: This problem does not arise for Hudi-created blobs (C11 -- instant in +path guarantees each write produces a unique path). +``` + +### Example 4: MOR log shadow -- base file ref appears live when superseded + +**Demonstrates:** C5, R4 + +``` +Setup (MOR table): + File Group FG-1: + Base file @t1: row1.blob_ref = (blob_A.bin, managed=true) + Log file @t2: row1.blob_ref = (blob_B.bin, managed=true) -- update + + After merge: row1's effective blob_ref is blob_B.bin. + blob_A.bin is no longer referenced by any live record. + +Cleanup scenario (pre-compaction): + Cleaner does not expire slice @t1 (it's retained). + Reading blob refs from the retained slice: + Base @t1: {blob_A.bin} + Log @t2: {blob_B.bin} + Union: {blob_A.bin, blob_B.bin} + + blob_A.bin appears live (it's in the retained set) even though it's been + superseded by the log update. This is over-retention -- safe but wasteful. + +After compaction: + Compacted base @t3: row1.blob_ref = (blob_B.bin, managed=true) + Now the only retained ref is {blob_B.bin}. + blob_A.bin is no longer in any retained set -> eligible for deletion. + +Why over-retention is the correct default: + If the log file @t2 is rolled back, row1 reverts to blob_A.bin from the base + file. If blob_A.bin had been prematurely deleted, the rollback produces a + dangling reference. Over-retention prevents this. +``` + +### Example 5: Writer-cleaner race -- three scenarios + +**Demonstrates:** C7, R1, R5 + +``` +A writer and cleaner operate concurrently on the same table. + +Scenario A: Writer commits BEFORE cleaner's timeline fence + t1: Writer starts, references blob_X + t2: Writer commits (blob_X is now in a retained slice) + t3: Cleaner plans cleanup + t4: Cleaner checks timeline fence -- sees writer's commit at t2 + t5: Cleaner removes blob_X from orphan candidates + -> Safe. Timeline fence catches the new reference. + +Scenario B: Writer commits AFTER cleaner's timeline fence, BEFORE delete + t1: Cleaner plans cleanup, blob_X is a candidate for deletion + t2: Cleaner checks timeline fence -- no new commits + t3: Writer commits, referencing blob_X + t4: Cleaner deletes blob_X + -> UNSAFE. The timeline fence did not see the writer's commit. + blob_X is deleted, but the writer's new slice references it. + +Scenario C: Writer commits AFTER cleaner deletes + t1: Cleaner plans and executes, deletes blob_X + t2: Writer commits, referencing blob_X (e.g., user-provided external path) + -> UNSAFE. The blob is already gone. The writer's commit creates a dangling + reference. + +Note: Scenario B and C cannot occur for Hudi-created blobs (C11 + D2 from +alternatives analysis: new writes always produce new paths, and UPSERT carries +forward refs from retained slices). They are real concerns for user-provided +external blobs where the user can reference any path. +``` + +### Example 6: Clustering moves refs -- replaced FG appears to have no retained slices + +**Demonstrates:** C8, C3, R1 + +``` +Setup: + File Group FG-1: + Slice @t1: row1.blob_ref = (blob_A.bin, managed=true) + + Clustering at t2 rewrites FG-1's records to FG-2: + File Group FG-2: + Slice @t2: row1.blob_ref = (blob_A_new.bin, managed=true) + (Hudi-managed: new blob created in FG-2's scope) + + FG-1 is now a replaced file group. Its slice @t1 is eligible for cleaning + after the retention policy expires. + +Cleanup: + Cleaner cleans FG-1's slice @t1. + Expired refs = {blob_A.bin} + Retained refs within FG-1 = {} (FG-1 has no retained slices -- it's replaced) + blob_A.bin appears orphaned within FG-1 -> DELETE + + This is actually CORRECT for Hudi-managed blobs: clustering created a new + blob (blob_A_new.bin) in FG-2, so blob_A.bin is genuinely orphaned. + + BUT if the blob were an external user-provided blob (same path referenced + from both FG-1 and FG-2 after clustering): + + File Group FG-2: + Slice @t2: row1.blob_ref = (s3://ext/video.mp4, managed=true) + (external: pointer copied, same blob) + + FG-1's cleanup concludes s3://ext/video.mp4 is orphaned within FG-1. + Deleting it destroys FG-2's live reference. + +Correct behavior: + For Hudi-managed blobs, per-FG cleanup is safe because clustering always + creates new blob files in the target FG. For external blobs, a cross-FG + check is required before deletion. +``` + +### Example 7: Non-bootstrapped external blobs at scale -- cross-FG verification is the common path + +**Demonstrates:** C3, C13, R6 (Flow 2) + +``` +Setup: + A media company stores 10M video files in s3://media-library/. + They create a Hudi table with a blob column referencing these videos. + They do NOT bootstrap -- Hudi manages refs, not storage layout. + + The table has 50K file groups across 1K partitions. + Many videos are referenced by multiple records (e.g., a popular video + appears in multiple user playlists across different partitions). + + Partition users/alice, FG-101: + Slice @t1: row1.blob_ref = (s3://media-library/video_X.mp4, managed=true) + + Partition users/bob, FG-202: + Slice @t1: row2.blob_ref = (s3://media-library/video_X.mp4, managed=true) + + Partition users/carol, FG-303: + Slice @t1: row3.blob_ref = (s3://media-library/video_X.mp4, managed=true) + +Action: + Cleaner expires FG-101's slice @t1 (alice deleted her playlist entry). + +Naive per-FG cleanup (incorrect): + FG-101 expired refs = {(s3://media-library/video_X.mp4, 0, 50MB)} + FG-101 retained refs = {} + Orphaned within FG-101 -> DELETE video_X.mp4 + Bob and Carol lose their video. Data corruption. + +Naive full-table scan (correct but expensive): + To verify video_X.mp4 is safe to delete, scan ALL 50K file groups + for references. This is correct but violates R6 -- the cost is + proportional to table size, not to the number of candidates. + +Scale concern: + If the cleaner expires 500 file groups and produces 2,000 external + blob candidates, and each candidate requires a full-table scan, + the cleanup cost is 2,000 * 50K = 100M file group checks. + This is prohibitive. + +Correct behavior: + Cross-FG verification for external blobs must use a targeted + mechanism (e.g., index lookup, partitioned scan with predicate + pushdown) that scales with the number of candidates, not with + the total table size. The mechanism must be a first-class design + element, not a fallback path. +``` + +### Example 8: MOR log-chain transient blob -- introduced and superseded within logs + +**Demonstrates:** C5, R2 + +``` +Setup (MOR table): + File Group FG-1: + Base file @t1: row1.blob_ref = (blob_A.bin, managed=true) + Log file @t2: row1.blob_ref = (blob_B.bin, managed=true) -- update + Log file @t3: row1.blob_ref = (blob_C.bin, managed=true) -- another update + + After merge: row1's effective blob_ref is blob_C.bin. + blob_B.bin was introduced at t2 and superseded at t3 -- it exists ONLY in log @t2. + +After compaction @t4: + Compacted base @t4: row1.blob_ref = (blob_C.bin, managed=true) + The pre-compaction slice (base @t1 + logs @t2, @t3) is now expired. + +Cleanup scenario: + Cleaner expires the pre-compaction slice. + Retained slice = compacted base @t4, refs = {blob_C.bin}. + + If expired slice reads only the base file: + expired_refs = {blob_A.bin} (from base @t1) + local_orphans = {blob_A.bin} - {blob_C.bin} = {blob_A.bin} + blob_A.bin is correctly identified as orphaned. + blob_B.bin is MISSED -- it exists only in expired log @t2. + blob_B.bin becomes a permanent orphan (R2 violation). + + If expired slice reads base + log files: + expired_refs = {blob_A.bin, blob_B.bin, blob_C.bin} + local_orphans = {blob_A.bin, blob_B.bin, blob_C.bin} - {blob_C.bin} + = {blob_A.bin, blob_B.bin} + Both orphaned blobs are correctly identified and deleted. + +Why this matters: + Transient blob refs that are introduced and superseded entirely within + the log chain never appear in any base file. They can only be discovered + by reading the expired log files. Without log reads on the expired side, + every such transient blob becomes a permanent orphan that accumulates + storage indefinitely. +``` + +--- + +## 7. Open Questions + +These questions must be answered by any solution design. They are not prescriptive -- multiple valid +answers exist for each. + +**Q1: What is blob identity?** +How does the cleanup algorithm identify a specific blob? By path alone? By the tuple +`(path, offset, length)`? By `(path, generation/version)`? The identity model determines how +deduplication, container handling, and delete-and-re-add (C2) are handled. + +**Q2: Where is liveness computed?** +Is the set of live blob references computed at write time (incremental), at clean time (batch), or +some combination? Write-time computation amortizes cost but requires additional metadata storage. +Clean-time computation avoids write overhead but may be expensive at scale. + +**Q3: What is the unit of cleanup planning?** +Does blob cleanup plan per-file-group (aligned with the existing cleaner), per-partition, or globally? +Per-FG is naturally aligned with OCC (C7) but cannot handle cross-FG sharing (C3) without extension. +Global planning handles cross-FG sharing but risks violating C7. + +**Q4: How does blob cleanup interact with archival?** +If the cleanup algorithm depends on commit metadata to determine which blobs were written, what +happens when those commits are archived (C12)? Must blob cleanup complete before archival? Must the +relevant metadata be persisted outside the active timeline? + +**Q5: Extension or separate service?** +Should blob cleanup be an extension of the existing file slice cleaner (same plan, same execution +phase) or a separate service (independent schedule, independent timeline action)? Extension aligns +lifecycle but increases cleaner complexity. Separation simplifies each component but introduces +coordination challenges. + +**Q6: Failure mode and recovery if premature deletion occurs?** +Despite best efforts, what happens if a blob is prematurely deleted? Is there a detection mechanism +(query-time error surfacing)? Is there a recovery path (rebuild from an external source)? How does +the system distinguish "blob correctly not present" from "blob incorrectly deleted"? + +**Q7: How does cross-FG verification scale for non-path-dispatched blobs?** +For Flow 2 workloads where cross-FG sharing is common, what mechanism makes cross-FG verification +efficient? Options include: an MDT index mapping blob paths to referencing file groups, predicate +pushdown on the blob ref column during targeted scans, a reference count maintained at write time, +or a bloom filter index. The chosen mechanism must satisfy R6 (cost proportional to candidates, not +table size) and C7 (no global serialization). How does this mechanism interact with writes, and what +is its maintenance cost? From b1e3599d7da434c730ba551c73ff278663e4b837 Mon Sep 17 00:00:00 2001 From: voon Date: Sun, 22 Mar 2026 14:15:13 +0800 Subject: [PATCH 2/6] Update cost --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index 85404644af084..20bc8cf2b66b2 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -634,7 +634,7 @@ cleaning: |--------------------------|---------------------------------|----------------------------|--------------------------------| | Non-blob table | Zero (`hasBlobColumns` gate) | N/A | **Zero** | | Flow 1 (Hudi-created) | ~6 Parquet reads per cleaned FG | Skipped | O(cleaned_FGs * slices_per_FG) | -| Flow 2 (external, index) | ~6 Parquet reads per cleaned FG | O(candidates) amortized | O(cleaned_FGs + candidates) | +| Flow 2 (external, index) | ~6 Parquet reads per cleaned FG | O(C * R_avg) | O(cleaned_FGs + C * R_avg) | | Flow 2 (external, scan) | ~6 Parquet reads per cleaned FG | O(candidates * table_size) | Circuit breaker limits this | ### Back-of-Envelope: Example 7 (50K FGs, 2K External Candidates) From 712b146edf6474664b33eae840c634233ed78b26 Mon Sep 17 00:00:00 2001 From: voon Date: Sun, 22 Mar 2026 15:07:22 +0800 Subject: [PATCH 3/6] Add SI -> RI consideration to appendix --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index 20bc8cf2b66b2..c5e52908865bc 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -747,3 +747,31 @@ Closes the writer-cleaner race window. Independent of the three stages. - **[Problem Statement, Constraints & Requirements](rfc-100-blob-cleaner-problem.md)** -- Complete problem scope, all 13 constraints (C1-C13), all 10 requirements (R1-R10), 8 illustrative failure mode examples, and open questions. + +### Why the MDT Secondary Index Maps to Record Keys (Not File Groups) + +Stage 2 uses a two-hop lookup: secondary index → record keys → record index → file group locations. +This is not an artifact of this RFC — it is the fundamental design of Hudi's secondary index +([RFC-77](../rfc-77/rfc-77.md)). The rationale: + +1. **Secondary keys are non-unique.** Unlike the record index (which maps unique record keys), + a secondary index is on arbitrary user columns (e.g., `city`, `status`) where many records + share the same value. The composite key format `{secondaryKey}${recordKey}` flattens this + non-unique mapping into unique tuples that fit the existing spillable/merge map infrastructure. + +2. **Record locations change independently of secondary key values.** Compaction, clustering, + and updates move records between file groups. The record index already maintains this mapping + correctly. A denormalized `secondary_key → file_group` mapping would duplicate that + maintenance burden and risk staleness. + +3. **Update handling requires tombstones on old values.** When a record's secondary key changes, + the old value may reside in a different file group in the SI partition than the new value. + The normalized design handles this with `old-secondary-key → (record-key, deleted)` tombstones, + which is simpler than tracking file group transitions directly. + +4. **Alternatives were evaluated and rejected.** RFC-77 considered direct `secondary_key → + file_group` mapping, Guava MultiMap, Chronicle Map, and separate spillable structures — all + rejected due to complexity, external dependencies, or maintenance cost. + +For this RFC, the two-hop cost is negligible: Step 1 (prefix scan) and Step 2 (record index lookup) +are each a single batched HFile forward-scan, adding ~3-7s total for 2K candidates. From 330df789788def57cbd3284d142ebdd69b29c47d Mon Sep 17 00:00:00 2001 From: voon Date: Mon, 30 Mar 2026 18:19:41 +0800 Subject: [PATCH 4/6] feat(blob): Address PR review - Add Example 9 to problem doc: demonstrates why blobFilesToDelete must be in the plan (writer-cleaner conflict resolution for external blobs) - Use engine-context HoodieData instead of HoodieListData.eager() for Stage 2 MDT lookups to distribute large candidate sets across executors - Update memory budget to reflect distributed Stage 2 processing --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 13 +++- rfc/rfc-100/rfc-100-blob-cleaner-problem.md | 68 +++++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index c5e52908865bc..f2988e1818efb 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -289,15 +289,18 @@ candidate_paths = external_candidates.map(ref -> ref.path).distinct() // Step 1: Batched prefix scan on secondary index // Key format: escaped(external_path)$escaped(record_key) // Returns ALL record keys that reference each candidate path +// Uses engine-context HoodieData (e.g., RDD on Spark) to distribute work +// across executors -- candidate sets can be large (row-level blob refs). +candidate_paths_data = engineContext.parallelize(candidate_paths) path_to_record_keys = mdtMetadata.readSecondaryIndexDataTableRecordKeysWithKeys( - HoodieListData.eager(candidate_paths), indexPartitionName) + candidate_paths_data, indexPartitionName) .groupBy(pair -> pair.getKey()) // Step 2: Batch record index lookup -- ONE call for ALL record keys // Sorts keys internally, single sequential forward-scan through HFile. all_record_keys = path_to_record_keys.values().flatMap() all_locations = mdtMetadata.readRecordIndexLocations( - HoodieListData.eager(all_record_keys)) // -> Map + all_record_keys) // -> Map // Step 3: In-memory resolution with short-circuit per candidate for path in candidate_paths: @@ -659,7 +662,11 @@ cleaning: Per-FG blob ref sets: ~100MB peak (500K records * 100 bytes/ref for expired + retained). FGs are processed sequentially within each partition batch -- per-FG sets are computed and discarded, not accumulated. Only the output lists (`hudi_blob_deletes`, `external_candidates`) grow, containing -only orphaned refs (much smaller). Peak heap: ~100MB * `cleanerParallelism` = 400MB-1.6GB. +only orphaned refs (much smaller). Peak heap for Stage 1: ~100MB * `cleanerParallelism` = 400MB-1.6GB. + +Stage 2 output lists (`candidate_paths`, `all_record_keys`) can be large -- each cleaned FG may +contribute row-level blob refs as candidates. These are backed by engine-context `HoodieData` +(e.g., Spark RDD) and distributed across executors, avoiding driver memory pressure. --- diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-problem.md b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md index 61b2f79f5e16f..c13d09f3e588e 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-problem.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md @@ -630,6 +630,74 @@ Why this matters: storage indefinitely. ``` +### Example 9: Why blobFilesToDelete must be in the plan -- writer-cleaner conflict resolution + +**Demonstrates:** C7, R1, R5 (extends Example 5, Scenario B) + +``` +Setup: + File Group FG-1 (partition users/alice): + Slice @t1 (expired): row1.blob_ref = (s3://ext/video.mp4, managed=true) + Slice @t3 (retained): row1.blob_ref = (s3://ext/photo.png, managed=true) + video.mp4 is locally orphaned in FG-1 (updated to photo.png at t3). + No other FG references video.mp4 at plan time. + + File Group FG-2 (partition users/bob): + (exists, but does not reference video.mp4 yet) + +Approach A: blobFilesToDelete NOT in plan (execution-time computation) + t1: Cleaner plans at timeline fence T. + Plan = {filePathsToBeDeleted: [FG-1/@t1]} + No blob info written to plan. Plan goes to timeline as REQUESTED. + + t2: Writer commits to FG-2, adds row2.blob_ref = (s3://ext/video.mp4). + Writer checks for conflicts: the clean plan on the timeline has no + blob info -- only filePathsToBeDeleted for FG-1. Writer is on FG-2. + No conflict detected. Writer succeeds. + + t3: Cleaner transitions to INFLIGHT. Executor computes blob deletes: + Reads FG-1/@t1 -> expired_refs = {video.mp4} + Reads FG-1/@t3 -> retained_refs = {photo.png} + video.mp4 locally orphaned -> Stage 2 cross-FG check at fence T + -> does NOT see writer's commit at t2 -> globally orphaned -> DELETE + + Result: FG-2 row2 now has a dangling reference to video.mp4. + Bob queries his data and gets a missing blob error. Data corruption. + + Why it cannot be fixed: the blob delete decision existed only in the + executor's memory. There was no artifact on the timeline for the writer's + conflict resolution to check against. The cross-FG conflict was invisible. + +Approach B: blobFilesToDelete IN the plan (plan-time computation) + t1: Cleaner plans at timeline fence T. + Stage 1: video.mp4 locally orphaned in FG-1. + Stage 2: cross-FG check -> no other FG references video.mp4. + Plan = {filePathsToBeDeleted: [FG-1/@t1], + blobFilesToDelete: [s3://ext/video.mp4]} + Plan goes to timeline as REQUESTED. + + t2: Writer commits to FG-2, adds row2.blob_ref = (s3://ext/video.mp4). + Writer's conflict resolution checks inflight/requested clean plan. + Sees blobFilesToDelete contains video.mp4 -- the same blob the + writer is referencing. CONFLICT DETECTED. Writer aborts and retries + after the clean cycle completes (or clean plan is rolled back). + -> Safe. The conflict is caught before corruption can occur. + + Alternative: if the writer commits first (wins the race), the clean + plan's conflict resolution at INFLIGHT transition detects that a new + commit references a blob in blobFilesToDelete. Clean plan is invalidated + and re-planned in the next cycle, where it will see FG-2's reference + and retain video.mp4. + +Key insight: + Today, clean actions are not part of OCC conflict resolution + (TransactionUtils.getInflightAndRequestedInstants excludes CLEAN_ACTION, + and ConcurrentOperation.init throws for clean actions). Adding external + blob cleanup requires extending conflict resolution to check + blobFilesToDelete. This is only possible if the blob delete list is a + durable artifact on the timeline -- i.e., part of the plan. +``` + --- ## 7. Open Questions From ca204242b0bdaaec694ad56f9ed82b610c0606c2 Mon Sep 17 00:00:00 2001 From: voon Date: Mon, 6 Apr 2026 13:32:38 +0800 Subject: [PATCH 5/6] Address comments. --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 368 +++++------- rfc/rfc-100/rfc-100-blob-cleaner-problem.md | 594 +++++++++++--------- 2 files changed, 456 insertions(+), 506 deletions(-) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index f2988e1818efb..7ae9335730859 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -15,7 +15,7 @@ limitations under the License. --> -# RFC-100 Part 2: Blob Cleanup for Unstructured Data +# RFC-100 Part 2: External Blob Cleanup for Unstructured Data ## Proposers @@ -23,7 +23,9 @@ ## Approvers -- (TBD) +- @rahil-c +- @vinothchandar +- @yihua ## Status @@ -35,15 +37,16 @@ Issue: ## Abstract -When Hudi cleans expired file slices, out-of-line blob files they reference may become orphaned -- -still consuming storage but unreachable by any query. This RFC extends the existing file slice -cleaner to identify and delete these orphaned blob files safely and efficiently. The design uses a -three-stage pipeline: (1) per-file-group set-difference to find locally-orphaned blobs, (2) an MDT -secondary index lookup for cross-file-group verification of externally-referenced blobs, and (3) -container file lifecycle resolution. For Hudi-created blobs, cleanup is essentially free -- structural -path uniqueness eliminates cross-file-group concerns entirely. For user-provided external blobs, -targeted index lookups scale with the number of candidates, not the table size. Tables without blob -columns pay zero cost. +When Hudi cleans expired file slices, external out-of-line blob files they reference may become +orphaned -- still consuming storage but unreachable by any query. This RFC extends the existing file +slice cleaner to identify and delete these orphaned blob files safely and efficiently. The design +uses a two-stage pipeline: (1) per-file-group set-difference to find locally-orphaned blobs, and +(2) cross-file-group verification via MDT secondary index lookup. Targeted index lookups scale with +the number of candidates, not the table size. Tables without blob columns pay zero cost. + +This design focuses on **external blobs** -- the Phase 1 use case of RFC-100 where users have +existing blob files in external storage (e.g., `s3://media-bucket/videos/`) and Hudi manages the +*references* via the `BlobReference` schema, not the *storage layout*. --- @@ -52,65 +55,54 @@ columns pay zero cost. ### Why Blob Cleanup Is Needed RFC-100 introduces out-of-line blob storage for unstructured data (images, video, documents). A -record's `BlobReference` field points to an external blob file by `(path, offset, length)`. When +record's `BlobReference` field points to an external blob file by `reference.external_path`. When the cleaner expires old file slices, the blob files they reference may no longer be needed -- but the existing cleaner has no concept of transitive references. It deletes file slices without considering the blob files they point to. Without blob cleanup, orphaned blobs accumulate indefinitely. -### Two Blob Flows - -Blob cleanup must support two distinct entry flows with fundamentally different properties: - -**Flow 1 -- Hudi-created blobs.** Blobs created by Hudi's write path, stored at -`{table}/.hoodie/blobs/{partition}/{col}/{instant}/{blob_id}`. The commit instant in the path -guarantees uniqueness (C11), and blobs are scoped to a single file group (P3). Cross-file-group -sharing does not occur. This is the expected majority flow for Phase 3 workloads. +### External Blobs -**Flow 2 -- User-provided external blobs.** Users have existing blob files in external storage -(e.g., `s3://media-bucket/videos/`). Records reference these blobs directly by path. Hudi manages -the *references*, not the *storage layout*. Cross-file-group sharing is common -- multiple records -across different file groups can point to the same blob. This is the expected primary flow for -Phase 1 workloads. +Users have existing blob files in external storage (e.g., `s3://media-bucket/videos/`). Records +reference these blobs directly by path. Hudi manages the *references*, not the *storage layout*. +Cross-file-group sharing is common -- multiple records across different file groups can point to the +same blob. Key properties: -| Property | Flow 1 (Hudi-created) | Flow 2 (External) | -|---------------------------|-----------------------------------|--------------------------------------| -| Path uniqueness | Guaranteed (instant in path, C11) | Not guaranteed (user controls) | -| Cross-FG sharing | Does not occur (FG-scoped) | Common (multiple records, same blob) | -| Writer/cleaner race | Cannot occur (D2) | Can occur (D3) | -| Per-FG cleanup sufficient | Yes | No -- cross-FG verification needed | +| Property | External blobs | +|---------------------------|----------------------------------------------| +| Path uniqueness | Not guaranteed (user controls) | +| Cross-FG sharing | Common (multiple records, same blob) | +| Writer/cleaner race | Can occur (external paths outside MVCC) | +| Per-FG cleanup sufficient | No -- cross-FG verification needed | ### Constraints and Requirements Reference -Full descriptions and failure modes in [Appendix B](rfc-100-blob-cleaner-problem.md). - -| ID | Constraint | Flow 1 | Flow 2 | Remarks | -|-----|-------------------------------------------------|--------|--------|------------------------------| -| C1 | Blob immutability (append-once, read-many) | Y | Y | | -| C2 | Delete-and-re-add same path | -- | Y | Eliminated for Flow 1 by C11 | -| C3 | Cross-file-group blob sharing | -- | Y | Common for external blobs | -| C4 | Container files (`(offset, length)` ranges) | Y | Y | | -| C5 | MOR log updates shadow base file blob refs | Y | Y | | -| C6 | Existing cleaner is per-file-group scoped | Y | Y | | -| C7 | OCC is per-file-group | Y | Y | No global contention allowed | -| C8 | Clustering moves blob refs between file groups | Y | Y | | -| C9 | Savepoints freeze file slices and blob refs | Y | Y | | -| C10 | Rollback can invalidate or resurrect references | Y | Y | | -| C11 | Blob paths include commit instant | Y | -- | Eliminates C2, C3, C13 | -| C12 | Archival removes commit metadata | Y | Y | | -| C13 | Cross-FG verification needed at scale | -- | Y | | +Full descriptions and failure modes in [Problem Statement](rfc-100-blob-cleaner-problem.md). + +| ID | Constraint | Remarks | +|-----|-----------------------------------------------------|----------------------------------| +| C1 | Blob immutability (append-once, read-many) | | +| C2 | Delete-and-re-add same path | Real concern for external blobs | +| C3 | Cross-file-group blob sharing | Common for external blobs | +| C4 | MOR log updates shadow base file blob refs | | +| C5 | Existing cleaner is per-file-group scoped | | +| C6 | OCC is per-file-group | No global contention allowed | +| C7 | Replace commits move blob refs between file groups | Clustering, insert_overwrite | +| C8 | Savepoints freeze file slices and blob refs | | +| C9 | Rollback and restore can invalidate or resurrect | | +| C10 | Archival removes commit metadata | | +| C11 | Cross-FG verification needed at scale | | | ID | Requirement | |-----|------------------------------------------------------------------| | R1 | No premature deletion (hard invariant) | | R2 | No permanent orphans (bounded cleanup) | -| R3 | Container awareness (range-level liveness) | -| R4 | MOR correctness (over-retention acceptable, under-retention not) | -| R5 | Concurrency safety (no global serialization) | -| R6 | Scale proportional to work, not table size | -| R7 | No cost for non-blob tables | -| R8 | All cleaning policies supported | -| R9 | Crash safety and idempotency | -| R10 | Observability (metrics for deleted, retained, reclaimed) | +| R3 | MOR correctness (over-retention acceptable, under-retention not) | +| R4 | Concurrency safety (no global serialization) | +| R5 | Scale proportional to work, not table size | +| R6 | No cost for non-blob tables | +| R7 | All cleaning policies supported | +| R8 | Crash safety and idempotency | +| R9 | Observability (metrics for deleted, retained, reclaimed) | --- @@ -120,46 +112,28 @@ Full descriptions and failure modes in [Appendix B](rfc-100-blob-cleaner-problem Blob cleanup extends the existing `CleanPlanner` / `CleanActionExecutor` pipeline -- same timeline instant, same plan-execute-complete lifecycle, same crash recovery and OCC integration. A -`hasBlobColumns()` check gates all blob logic so non-blob tables pay zero cost. - -The two flows have different cost structures, and the design keeps them separate. Flow 1 -(Hudi-created blobs) gets per-FG cleanup with no cross-FG overhead. Flow 2 (external blobs) gets -targeted cross-FG verification via MDT secondary index. Dispatch is a string prefix check on the -blob path. - -### Three-Stage Pipeline +`hasBlobColumns()` check gates all blob logic so non-blob tables pay near zero cost (schema scan +cost). -| Stage | Scope | Purpose | When it runs | -|-------------|----------------------|----------------------------------------------------------------------------------|----------------------------------------| -| **Stage 1** | Per-file-group | Collect expired/retained blob refs, compute set difference, dispatch by category | Always (for blob tables) | -| **Stage 2** | Cross-file-group | Verify external blob candidates against MDT secondary index or fallback scan | Only when external candidates exist | -| **Stage 3** | Container resolution | Determine delete vs. flag-for-compaction at the container level | Only when container blobs are involved | +External blobs require cross-file-group verification because the same blob can be referenced from +multiple file groups (C3, C11). The design uses targeted MDT secondary index lookups that scale +with the number of candidates, not the table size. -### Independent Implementability +### Two-Stage Pipeline -The three stages have clean input/output interfaces and can be implemented, tested, and shipped -independently: - -| Stage | Input | Output | -|---------|---------------------------------------------------------|-----------------------------------------------------| -| Stage 1 | `FileGroupCleanResult` (expired + retained slices) | `hudi_blob_deletes`, `external_candidates` | -| Stage 2 | `external_candidates`, `cleaned_fg_ids` | `external_deletes` | -| Stage 3 | `hudi_blob_deletes` + `external_deletes`, retained refs | `blob_files_to_delete`, `containers_for_compaction` | - -A shared foundation layer must land first (see [Rollout / Adoption Plan](#rollout--adoption-plan)), after which stages -can proceed in any order. +| Stage | Scope | Purpose | When it runs | +|-------------|------------------|----------------------------------------------------------------------|------------------------------| +| **Stage 1** | Per-file-group | Collect expired/retained blob refs, compute set difference | Always (for blob tables) | +| **Stage 2** | Cross-file-group | Verify candidates against MDT secondary index or fallback scan | When local orphans exist | ### Key Decisions -| Decision | Choice | Rationale | -|---------------------|---------------------------------------------------------|--------------------------------------------------------------------| -| Blob identity | `(path, offset, length)` tuple | Handles containers (C4) and path reuse (C2) correctly | -| Cleanup scope | Per-FG (Hudi blobs) + MDT index lookup (external blobs) | Aligns with OCC (C7) and existing cleaner (C6); scales for C13 | -| Dispatch mechanism | Path prefix check on blob path | Zero-cost classification; Hudi blobs match `.hoodie/blobs/` prefix | -| Cross-FG mechanism | MDT secondary index on `reference.external_path` | Short-circuits on first non-cleaned FG ref; first-class for Flow 2 | -| Write-path overhead | None (Flow 1); MDT index maintenance (Flow 2) | Index maintained by existing MDT pipeline, not a new write cost | -| MOR strategy | Over-retain (union of base + log refs) | Safe (C5, R4); cleaned after compaction | -| Container strategy | Tuple-level tracking; delete only when all ranges dead | Correct (C4, R3); partial containers flagged for blob compaction | +| Decision | Choice | Rationale | +|---------------------|---------------------------------------------------------|----------------------------------------------------------------| +| Blob identity | `reference.external_path` | Path-based identity for external blobs | +| Cleanup scope | Per-FG candidate identification + cross-FG verification | Aligns with OCC (C6) and existing cleaner (C5); scales for C11 | +| Cross-FG mechanism | MDT secondary index on `reference.external_path` | Short-circuits on first non-cleaned FG ref | +| MOR strategy | Over-retain (union of base + log refs) | Safe (C4, R3); cleaned after compaction | ```mermaid flowchart LR @@ -172,35 +146,29 @@ flowchart LR subgraph CP["CleanPlanner (per-partition, per-FG)"] direction TB Policy["Policy method
→ FileGroupCleanResult
(expired + retained slices)"] - S1["Stage 1
Per-FG blob ref
set difference + dispatch"] + S1["Stage 1
Per-FG blob ref
set difference"] Policy --> S1 end S1 --> S2["Stage 2
Cross-FG verification
(MDT secondary index)"] - S1 -->|hudi_blob_deletes| S3 - S2 -->|external_deletes| S3["Stage 3
Container lifecycle
resolution"] end subgraph Plan["HoodieCleanerPlan"] FP["filePathsToBeDeleted
(existing)"] BP["blobFilesToDelete
(new)"] - CC["containersToCompact
(new)"] end - S3 --> BP - S3 --> CC + S2 --> BP CP --> FP subgraph Execution["CleanActionExecutor.runClean()"] direction TB DF["Delete file slices
(existing, parallel)"] DB["Delete blob files
(new, parallel)"] - RC["Record containers
for blob compaction"] end FP --> DF BP --> DB - CC --> RC ``` --- @@ -211,26 +179,25 @@ flowchart LR Stage 1 runs after the existing policy logic determines which file slices are expired and retained for a given file group. It collects blob refs from both sets and computes locally-orphaned blobs by -set difference. +set difference. All local orphans proceed to Stage 2 for cross-FG verification. ``` Input: A file group FG with expired_slices and retained_slices (from policy) -Output: hudi_blob_deletes -- blobs safe to delete immediately - external_candidates -- external blobs needing cross-FG verification +Output: local_orphan_candidates -- external blobs needing cross-FG verification for each file_group being cleaned: // Collect expired blob refs (base files + log files) // Must read log files: blob refs introduced and superseded within the log // chain before compaction would otherwise become permanent orphans. - expired_refs = Set<(path, offset, length)>() + expired_refs = Set() for slice in expired_slices: for ref in extractBlobRefs(slice.baseFile): // columnar projection if ref.type == OUT_OF_LINE and ref.managed == true: - expired_refs.add((ref.path, ref.offset, ref.length)) + expired_refs.add(ref.external_path) for ref in extractBlobRefs(slice.logFiles): // full record read if ref.type == OUT_OF_LINE and ref.managed == true: - expired_refs.add((ref.path, ref.offset, ref.length)) + expired_refs.add(ref.external_path) if expired_refs is empty: continue // no blob work for this FG @@ -239,52 +206,47 @@ for each file_group being cleaned: // Cleaning is fenced on compaction: retained base files contain the merged // state. Log reads are unnecessary -- any shadowed base ref causes safe // over-retention, cleaned after the next compaction cycle. - retained_refs = Set<(path, offset, length)>() + retained_refs = Set() for slice in retained_slices: for ref in extractBlobRefs(slice.baseFile): // columnar projection only if ref.type == OUT_OF_LINE and ref.managed == true: - retained_refs.add((ref.path, ref.offset, ref.length)) + retained_refs.add(ref.external_path) // Compute local orphans by set difference local_orphans = expired_refs - retained_refs - // Dispatch by blob category - for ref in local_orphans: - if ref.path starts with TABLE_PATH + "/.hoodie/blobs/": - hudi_blob_deletes.add(ref) // P3: no cross-FG refs possible - else: - external_candidates.add(ref) // C13: cross-FG refs are common + // All local orphans proceed to Stage 2 for cross-FG verification + all_local_orphans.addAll(local_orphans) ``` **Correctness notes:** -- **Hudi-created blobs:** If a blob ref appears in expired but not retained slices of the same FG, - it is globally orphaned -- Hudi blobs are FG-scoped (C11), so no cross-FG check is needed. - **MOR -- expired side reads base + logs:** Blob refs can be introduced and superseded entirely - within the log chain (e.g., `log@t2: row1→blob_B`, then `log@t3: row1→blob_C`). After + within the log chain (e.g., `log@t2: row1->blob_B`, then `log@t3: row1->blob_C`). After compaction, `blob_B` exists only in the expired log. Skipping logs would orphan it permanently. - **MOR -- retained side reads base only:** Cleaning is fenced on compaction, so retained base files contain the merged state. Shadowed base refs cause over-retention (safe), cleaned after the next compaction. - **Savepoints:** Inherited from existing cleaner -- savepointed slices stay in the retained set. -- **Replaced FGs (clustering):** `retained_slices` is empty, so all blob refs become candidates. - Hudi blobs are safe to delete (clustering creates new blobs in the target FG). External blobs - flow to Stage 2 (clustering copies the pointer, so Stage 2 finds it in the target FG). +- **Replaced FGs (replace commits):** `retained_slices` is empty, so all blob refs become + candidates. For external blobs, clustering copies the pointer to the target FG, so Stage 2 + finds the reference in the target FG and retains the blob. -### Stage 2: Cross-FG Verification (External Blobs) +### Stage 2: Cross-File-Group Verification -Stage 2 executes only when `external_candidates` is non-empty. For Flow 1 workloads (Hudi-created -blobs only), this stage is skipped entirely. +Stage 2 verifies each local orphan candidate against the global state to determine if the blob is +still referenced by any active file slice outside the cleaned file groups. This is necessary because +external blobs can be shared across file groups (C3, C11). #### Primary path: MDT secondary index When the MDT secondary index on `reference.external_path` is available and fully built: ``` -Input: external_candidates, cleaned_fg_ids -Output: external_deletes (confirmed globally orphaned) +Input: all_local_orphans, cleaned_fg_ids +Output: blob_files_to_delete (confirmed globally orphaned) -candidate_paths = external_candidates.map(ref -> ref.path).distinct() +candidate_paths = all_local_orphans.distinct() // Step 1: Batched prefix scan on secondary index // Key format: escaped(external_path)$escaped(record_key) @@ -307,7 +269,7 @@ for path in candidate_paths: record_keys = path_to_record_keys.getOrDefault(path, []) if record_keys is empty: - external_deletes.addAll(candidates for this path) // globally orphaned + blob_files_to_delete.add(path) // globally orphaned continue found_live_reference = false @@ -315,10 +277,10 @@ for path in candidate_paths: location = all_locations.get(rk) if location != null and location.fileId NOT in cleaned_fg_ids: found_live_reference = true - break // short-circuit (in-memory) + break // short-circuit (in-memory) if not found_live_reference: - external_deletes.addAll(candidates for this path) // all refs in cleaned FGs + blob_files_to_delete.add(path) // all refs in cleaned FGs ``` **Cost model.** Three steps: (1) batched prefix scan on secondary index, (2) batched record index @@ -332,10 +294,7 @@ are each a single I/O pass; step 3 is pure hash set lookups. | 3. In-memory resolution | Hash set checks (cleaned_fg_ids) | ~0ms | *Estimates assume cloud object storage (S3/GCS/ADLS), ~10-100ms per-read latency, ~50-200 MB/s -sequential throughput, 64-256KB HFile blocks. Step 1: a 250K-entry secondary index is ~50MB; a -sorted scan for 2K prefixes reads ~200-500 blocks, dominated by HFile open (~100-200ms) plus -sequential block reads with cold-cache latency overhead. Step 2: similar profile, fewer blocks -hit relative to index size. Pending benchmarking.* +sequential throughput, 64-256KB HFile blocks. Pending benchmarking.* **Index definition.** Uses the existing `HoodieIndexDefinition` mechanism with `sourceFields = ["", "reference", "external_path"]`. The nested field path is supported @@ -355,12 +314,12 @@ a bottleneck on large tables. The operator is warned to enable the MDT secondary #### Decision matrix -| Condition | Path used | Cost | Suitable for | -|-----------------------------|---------------|-----------------------|--------------------------------| -| No external candidates | Skip Stage 2 | Zero | Flow 1 workloads | -| MDT secondary index enabled | Index lookup | O(candidates) | Flow 2 at any scale | -| No index, few candidates | Table scan | O(candidates * table) | Small tables, few shared blobs | -| No index, many candidates | Circuit break | Zero (deferred) | Large tables -- index required | +| Condition | Path used | Cost | Suitable for | +|-----------------------------|---------------|-----------------------|---------------------------| +| No local orphan candidates | Skip Stage 2 | Zero | No blob work this cycle | +| MDT secondary index enabled | Index lookup | O(candidates) | Any scale | +| No index, few candidates | Table scan | O(candidates * table) | Small tables | +| No index, many candidates | Circuit break | Zero (deferred) | Large tables need index | ```mermaid sequenceDiagram @@ -393,51 +352,23 @@ sequenceDiagram end ``` -### Stage 3: Container File Lifecycle - -Container files pack multiple blobs at different `(offset, length)` ranges. A container can only be -deleted when *all* ranges within it are unreferenced. - -``` -all_deletes = hudi_blob_deletes + external_deletes - -// Non-container blobs: each occupies an entire file -> delete directly -for ref in all_deletes where ref.offset is null: - blob_files_to_delete.add(ref.path) - -// Container blobs: group by path, check for remaining live ranges -for each container_path in container_range_deletes grouped by path: - dead_ranges = ranges being deleted for this container - live_ranges = retained refs (from Stage 1) + referenced refs (from Stage 2) - - if live_ranges is empty: - blob_files_to_delete.add(container_path) // entire container dead - else: - containers_for_compaction.add(container_path, dead_ranges) // partial -> repack -``` - -For Hudi-created containers, all ranges belong to the same FG (instant in path scopes it to a -single write), so retained refs from Stage 1 are sufficient -- no cross-FG check needed. - ### Execution Flow ``` 1. CleanPlanActionExecutor.requestClean() - ├── hasBlobColumns(table)? // R7: zero-cost gate + ├── hasBlobColumns(table)? // R6: zero-cost gate ├── CleanPlanner: for each partition, for each file group: │ ├── Refactored policy method -> FileGroupCleanResult │ └── If hasBlobColumns: Stage 1 per FG ├── CleanPlanner: replaced file groups -> Stage 1 - ├── If external_candidates non-empty: Stage 2 - ├── Stage 3: container lifecycle resolution - ├── Build HoodieCleanerPlan (+ blobFilesToDelete, containersToCompact) + ├── If local orphan candidates non-empty: Stage 2 + ├── Build HoodieCleanerPlan (+ blobFilesToDelete) └── Persist plan to timeline (REQUESTED state) 2. CleanActionExecutor.runClean() ├── Transition to INFLIGHT ├── Delete file slices (existing, parallelized) ├── Delete blob files (new, same parallelized pattern) - ├── Record containers for blob compaction (metadata only) ├── Build HoodieCleanMetadata with blobCleanStats └── Transition to COMPLETED ``` @@ -452,8 +383,7 @@ sequenceDiagram Note over P: requestClean() P->>P: Stage 1: per-FG blob ref set difference - P->>P: Stage 2: MDT index lookup (if external candidates) - P->>P: Stage 3: container lifecycle resolution + P->>P: Stage 2: MDT index lookup (if candidates exist) P->>TL: Persist HoodieCleanerPlan Note over TL: REQUESTED @@ -511,25 +441,23 @@ expired/retained classification logic. ### Replaced File Group Handling -Replaced file groups (from clustering) are cleaned via `getReplacedFilesEligibleToClean()`. A -parallel method `getReplacedFileGroupBlobCleanResults()` produces `FileGroupCleanResult` objects -with `retainedSlices = empty` and `expiredSlices = all slices`. This feeds into Stage 1 identically -to normal file groups. +Replaced file groups (from clustering, insert_overwrite, insert_overwrite_table) are cleaned via +`getReplacedFilesEligibleToClean()`. A parallel method `getReplacedFileGroupBlobCleanResults()` +produces `FileGroupCleanResult` objects with `retainedSlices = empty` and +`expiredSlices = all slices`. This feeds into Stage 1 identically to normal file groups. ### Schema Changes: HoodieCleanerPlan -Two new nullable fields with null defaults (backward compatible): +One new nullable field with null default (backward compatible): -- **`blobFilesToDelete`**: `List` -- blob files where all ranges are dead. - The executor deletes these. -- **`containersToCompact`**: `List` -- containers with mixed live/dead - ranges, including the dead `(offset, length)` ranges. Handed to the blob compaction service. +- **`blobFilesToDelete`**: `List` -- external blob files confirmed as + globally orphaned. The executor deletes these. ### Schema Changes: HoodieCleanMetadata A new nullable field `blobCleanStats` of type `HoodieBlobCleanStats`: -- `totalBlobFilesDeleted`, `totalBlobFilesRetained`, `totalContainersFlaggedForCompaction` +- `totalBlobFilesDeleted`, `totalBlobFilesRetained` - `totalBlobStorageReclaimed` - `deletedBlobFilePaths`, `failedBlobFilePaths` @@ -542,27 +470,26 @@ blob cleanup logic. Requires making `containsBlobType()` public (one-line visibi ## Concurrency & Safety -### Writer-Cleaner Race: Two-Sided Guard +### Writer-Cleaner Race: Conflict Check -**Hudi-created blobs (Flow 1): structurally impossible.** A new write creates new blob files with a -new instant in the path (C11). An UPSERT carries forward blob refs from retained slices (already -live). There is no mechanism for a writer to reference a Hudi-created blob that exists only in -expired slices. No writer-side check is needed for Flow 1. +Under Hudi's MVCC design, the cleaner and writers operate on non-overlapping file slices -- the +cleaner never conflicts with writers on file slice operations. However, external blob files are +**not** covered by MVCC: a writer's new file slice may reference an external blob that the cleaner +is simultaneously evaluating for deletion. -**External blobs (Flow 2): writer-side conflict check in `preCommit()`.** The gap between the -cleaner's planning-time snapshot and its actual file deletion is closed by a commit-time conflict -check: +**Writer-side conflict check in `preCommit()`.** The gap between the cleaner's planning-time +snapshot and its actual file deletion is closed by a commit-time conflict check: 1. Writers track external managed blob paths in `HoodieWriteStat.externalBlobPaths` (in-memory collection, no additional I/O). 2. At commit time (in `preCommit()`, under the existing transaction lock), the writer checks all three clean states -- COMPLETED, INFLIGHT, and REQUESTED -- because a REQUESTED plan can begin - executing at any moment (the REQUESTED→INFLIGHT transition doesn't acquire the transaction + executing at any moment (the REQUESTED->INFLIGHT transition doesn't acquire the transaction lock). It checks `deletedBlobFilePaths` (COMPLETED) and `blobFilesToDelete` (INFLIGHT/REQUESTED). 3. If any overlap is found, the commit is rejected with `HoodieWriteConflictException` and the writer retries. -Cost is zero for non-blob tables and Flow 1. For Flow 2: one timeline scan + 1-3 metadata reads. +Cost is zero for non-blob tables. For external blobs: one timeline scan + 1-3 metadata reads. ```mermaid sequenceDiagram @@ -605,14 +532,14 @@ sequenceDiagram | Operation | Concurrent with Blob Cleaner | Safety Mechanism | |--------------------------------|------------------------------|-----------------------------------------------------------| -| Regular write (INSERT/UPSERT) | Safe | C11 path uniqueness (Flow 1); writer-side check (Flow 2) | +| Regular write (INSERT/UPSERT) | Safe | Writer-side conflict check in preCommit() | | Compaction | Safe | `isFileSliceNeededForPendingMajorOrMinorCompaction` | -| Clustering | Safe | Replaced FG lifecycle; Stage 2 for external blobs | +| Clustering / insert_overwrite | Safe | Replaced FG lifecycle; Stage 2 finds refs in target FG | | Rollback | Safe | MOR over-retention; clean operates on post-rollback state | -| Savepoint create/delete | Safe | `isFileSliceExistInSavepointedFiles` | +| Restore | Safe | Clean operates on post-restore state | +| Savepoint create/delete | Safe | Savepointed slices excluded from cleaning | | Archival | No interaction | Blob cleaner reads file slices, not commit metadata | | Another cleaner instance | Safe | `TransactionManager`; `checkIfOtherWriterCommitted` | -| Blob compaction | Safe | Independent lifecycle | | MDT writes (index maintenance) | Safe | MDT commit atomicity | ### Crash Recovery @@ -624,7 +551,7 @@ cleaning: |-----------------------------------------|--------------------------------------------------------------------------------------------------------| | During planning (before plan persisted) | No REQUESTED instant on timeline. Cleaner starts fresh. | | After plan persisted, before execution | REQUESTED instant found; plan re-read and executed. | -| During execution (partial deletes) | INFLIGHT instant re-executed. Already-deleted files return FileNotFoundException → treated as success. | +| During execution (partial deletes) | INFLIGHT instant re-executed. Already-deleted files return FileNotFoundException -> treated as success. | | After execution, before COMPLETED | INFLIGHT re-executed. All deletes are no-ops. Metadata written, instant transitions to COMPLETED. | --- @@ -633,14 +560,13 @@ cleaning: ### Cost Summary -| Workload | Stage 1 cost | Stage 2 cost | Total per cleanup cycle | -|--------------------------|---------------------------------|----------------------------|--------------------------------| -| Non-blob table | Zero (`hasBlobColumns` gate) | N/A | **Zero** | -| Flow 1 (Hudi-created) | ~6 Parquet reads per cleaned FG | Skipped | O(cleaned_FGs * slices_per_FG) | -| Flow 2 (external, index) | ~6 Parquet reads per cleaned FG | O(C * R_avg) | O(cleaned_FGs + C * R_avg) | -| Flow 2 (external, scan) | ~6 Parquet reads per cleaned FG | O(candidates * table_size) | Circuit breaker limits this | +| Workload | Stage 1 cost | Stage 2 cost | Total per cleanup cycle | +|------------------------------|---------------------------------|----------------------------|--------------------------------| +| Non-blob table | Zero (`hasBlobColumns` gate) | N/A | **Zero** | +| External blobs (index) | ~6 Parquet reads per cleaned FG | O(C * R_avg) | O(cleaned_FGs + C * R_avg) | +| External blobs (scan) | ~6 Parquet reads per cleaned FG | O(candidates * table_size) | Circuit breaker limits this | -### Back-of-Envelope: Example 7 (50K FGs, 2K External Candidates) +### Back-of-Envelope: Example 6 (50K FGs, 2K External Candidates) | Parameter | Value | Notes | |-------------------------------------|-----------|------------------------------------------------------| @@ -677,33 +603,25 @@ contribute row-level blob refs as candidates. These are backed by engine-context | `hoodie.cleaner.blob.enabled` | `true` | Enable blob cleanup during clean action | | `hoodie.cleaner.blob.dry.run` | `false` | Compute blob cleanup plan and log results but do not execute | | `hoodie.cleaner.blob.external.scan.parallelism` | `10` | Parallelism for Stage 2 fallback table scan | -| `hoodie.cleaner.blob.external.scan.max.candidates` | `1000` | Circuit breaker for Stage 2 fallback scan; exceeding defers external blob cleanup | -| `hoodie.metadata.index.secondary.column` | (none) | Set to `.reference.external_path` for Flow 2 cross-FG verification | +| `hoodie.cleaner.blob.external.scan.max.candidates` | `1000` | Circuit breaker for Stage 2 fallback scan; exceeding defers blob cleanup | +| `hoodie.metadata.index.secondary.column` | (none) | Set to `.reference.external_path` for cross-FG verification | --- ## Rollout / Adoption Plan -Each stage can be implemented, tested, and shipped independently once the foundation layer is in -place (see [Independent Implementability](#independent-implementability)). - **Foundation (shared prerequisite).** `CleanPlanner` refactoring (policy methods return -`FileGroupCleanResult`), `BlobRef` type, schema changes (nullable `blobFilesToDelete` and -`containersToCompact` fields), and the `hasBlobColumns` zero-cost gate. - -**Stage 1 (per-FG cleanup).** Set-difference logic and dispatch by blob category. Produces -`hudi_blob_deletes` (immediate) and `external_candidates` (for Stage 2). +`FileGroupCleanResult`), schema changes (nullable `blobFilesToDelete` field), and the +`hasBlobColumns` zero-cost gate. -**Stage 2 (cross-FG verification) -- priority.** Flow 2 (external blobs) is the primary initial -use case -- cross-FG verification prevents premature deletion of shared blobs. Requires MDT + -record index + secondary index on `reference.external_path` (P6). Includes fallback table scan -with circuit breaker. +**Stage 1 (per-FG cleanup).** Set-difference logic. Produces local orphan candidates for Stage 2. -**Stage 3 (container lifecycle).** Delete-entire-file vs. flag-for-compaction at the container -level. Needed only when container files are used. +**Stage 2 (cross-FG verification) -- priority.** External blobs are the primary initial use case -- +cross-FG verification prevents premature deletion of shared blobs. Requires MDT + record index + +secondary index on `reference.external_path`. Includes fallback table scan with circuit breaker. -**Writer-side conflict check.** `preCommit()` conflict check for Flow 2 concurrency safety. -Closes the writer-cleaner race window. Independent of the three stages. +**Writer-side conflict check.** `preCommit()` conflict check for concurrency safety. Closes the +writer-cleaner race window. Independent of the two stages. ### Backward Compatibility @@ -721,20 +639,16 @@ Closes the writer-cleaner race window. Independent of the three stages. - **Stage 1 set-difference:** Verify correct orphan identification for COW and MOR file groups, including MOR over-retention (shadowed base refs kept until post-compaction). -- **Stage 1 dispatch:** Verify Hudi-created blobs route to `hudi_blob_deletes` and external blobs - route to `external_candidates`. - **Stage 2 index lookup:** Verify short-circuit behavior (stop after first live reference), empty results (globally orphaned), and batched prefix scans. - **Stage 2 fallback:** Verify table scan correctness and circuit breaker activation. -- **Stage 3 container lifecycle:** Verify delete-all-dead vs. flag-for-compaction decisions. - **Writer-side conflict check:** Verify detection of conflicts with COMPLETED, INFLIGHT, and REQUESTED clean actions. ### Integration Tests -- End-to-end clean cycle with Hudi-created blob table (COW and MOR). -- End-to-end clean cycle with external blob table and MDT secondary index. -- Clean cycle with replaced file groups (post-clustering). +- End-to-end clean cycle with external blob table and MDT secondary index (COW and MOR). +- Clean cycle with replaced file groups (post-clustering, post-insert_overwrite). ### Concurrency Tests @@ -752,7 +666,7 @@ Closes the writer-cleaner race window. Independent of the three stages. ## Appendix - **[Problem Statement, Constraints & Requirements](rfc-100-blob-cleaner-problem.md)** - -- Complete problem scope, all 13 constraints (C1-C13), all 10 requirements (R1-R10), 8 + -- Complete problem scope, all 11 constraints (C1-C11), all 9 requirements (R1-R9), 7 illustrative failure mode examples, and open questions. ### Why the MDT Secondary Index Maps to Record Keys (Not File Groups) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-problem.md b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md index c13d09f3e588e..ed8c5070b1b84 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-problem.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-problem.md @@ -1,12 +1,33 @@ -# Blob Cleaner: Problem Statement + + +# External Blob Cleaner: Problem Statement ## 1. Goal -When old file slices are cleaned, out-of-line blob files they reference may become orphaned -- still -consuming storage but unreachable by any query. The blob cleaner must identify and delete these -unreferenced blob files without premature deletion (deleting a blob that is still referenced by a live -record). This document defines the problem scope, design constraints, requirements, and illustrative -failure modes. It contains no solution content. +When old file slices are cleaned, external out-of-line blob files they reference may become orphaned +-- still consuming storage but unreachable by any query. The blob cleaner must identify and delete +these unreferenced blob files without premature deletion (deleting a blob that is still referenced by +a live record). This document defines the problem scope, design constraints, requirements, and +illustrative failure modes. It contains no solution content. + +This document focuses on **external blobs** -- the Phase 1 use case of RFC-100 where users have +existing blob files in external storage (e.g., `s3://media-bucket/videos/`) and Hudi manages the +*references* via the `BlobReference` schema, not the *storage layout*. --- @@ -14,68 +35,58 @@ failure modes. It contains no solution content. ### In scope -- Cleanup of **out-of-line blob files** when references to them exist only in expired (cleaned) file - slices. +- Cleanup of **external out-of-line blob files** (referenced via `BlobReference.external_path`) when + references to them exist only in expired (cleaned) file slices. - All table types: **COW** and **MOR**. - All cleaning policies: `KEEP_LATEST_COMMITS`, `KEEP_LATEST_FILE_VERSIONS`, `KEEP_LATEST_BY_HOURS`. -- Interaction with table services: **compaction**, **clustering**, **blob compaction**. -- Interaction with timeline operations: **savepoints**, **rollback**, **archival**. +- Interaction with table services: **compaction**, **clustering**. +- Interaction with replace commits: **clustering**, **insert_overwrite**, + **insert_overwrite_table**. +- Interaction with timeline operations: **savepoints**, **rollback**, **restore**, **archival**. - Single-writer and multi-writer (OCC) concurrency modes. -- Both **Hudi-created blobs** (stored under `{table}/.hoodie/blobs/...`) and **user-provided - external blobs** (arbitrary paths). - -### Two entry flows - -Blob cleanup must support two distinct entry flows. These are not edge cases of each other -- -they are co-equal paths with different properties, different volumes, and different cleanup costs. - -**Flow 1: Path-dispatched (Hudi-created blobs).** Blobs created by Hudi's write path and stored -under `{table}/.hoodie/blobs/{partition}/{col}/{instant}/{blob_id}`. The path structure guarantees -uniqueness (C11), file-group scoping, and eliminates cross-FG sharing for normal writes. This is the -expected majority flow for Phase 3 workloads. -**Flow 2: Non-path-dispatched (user-provided external blobs).** Users have existing blob files in -external storage (e.g., `s3://media-bucket/videos/`, a shared NFS mount, or any user-controlled -path). Records reference these blobs directly by path. The user does **not** want to bootstrap -- -they do not want Hudi to copy, move, or reorganize the blob files into `.hoodie/blobs/`. Hudi -manages the *references*, not the *storage layout*. This is the expected primary flow for Phase 1 -workloads and remains a supported flow in Phase 3. - -The non-path-dispatched flow has fundamentally different properties: - -| Property | Path-dispatched (Hudi-created) | Non-path-dispatched (external) | -|---------------------------|-----------------------------------|--------------------------------------| -| Path uniqueness | Guaranteed (instant in path, C11) | Not guaranteed (user controls) | -| Cross-FG sharing | Does not occur (FG-scoped) | Common (multiple records, same blob) | -| Writer/cleaner race | Cannot occur (D2) | Can occur (D3) | -| Delete-and-re-add (C2) | Eliminated | Real concern | -| Volume | Scales with writes | Can be large from day one | -| Per-FG cleanup sufficient | Yes | No -- cross-FG verification needed | - -Any solution that treats the non-path-dispatched flow as a rare edge case will fail at scale for -Phase 1 workloads. The cleanup algorithm must be efficient for **both** flows independently, and -must not impose the cost structure of one flow on the other. +| Property | External blobs | +|---------------------------|----------------------------------------------| +| Path uniqueness | Not guaranteed (user controls) | +| Cross-FG sharing | Common (multiple records, same blob) | +| Writer/cleaner race | Can occur (external paths outside MVCC) | +| Delete-and-re-add | Real concern (user controls paths) | +| Per-FG cleanup sufficient | No -- cross-FG verification needed | ### Out of scope - **Inline blobs.** Inline blob data lives inside the base/log file and is deleted when the file slice is cleaned. No additional cleanup needed. -- **Blob compaction internals.** Blob compaction (repacking partially-live container files) is a - separate service. This document defines the interface point (when to hand off to blob compaction) - but not its internal design. +- **Storing blob references in commit metadata.** Persisting blob reference sets within commit-level + metadata is an anti-pattern that does not scale and is not considered in this problem statement. - **Schema evolution.** Adding or removing blob columns does not change the cleanup problem. ### Stance on the `managed` flag -The BlobReference schema includes a `managed` boolean field -(`HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED`). The RFC states that only managed blobs are -cleaned. This document acknowledges the flag and treats it as a **filter** -- unmanaged blobs are -excluded from cleanup consideration. However, the cleanup design must be **correct regardless of the -flag's value**. The flag selects *which* blobs enter the cleanup pipeline; it must not be used as a -correctness lever within the pipeline itself. The flag may later serve as an optimization (skip -cleanup work for unmanaged blobs), but the problem statement and any solution must not depend on it -for safety. +The `BlobReference` schema includes a `managed` boolean field (`reference.managed`). The RFC states +that only managed blobs are cleaned. This document acknowledges the flag and treats it as a +**filter** -- unmanaged blobs are excluded from cleanup consideration. However, the cleanup design +must be **correct regardless of the flag's value**. The flag selects *which* blobs enter the cleanup +pipeline; it must not be used as a correctness lever within the pipeline itself. The flag may later +serve as an optimization (skip cleanup work for unmanaged blobs), but the problem statement and any +solution must not depend on it for safety. + +**Managed-to-unmanaged transitions.** An external blob's `reference.managed` flag can change across +writes. At time t1, a record references `s3://ext/video.mp4` with `managed=true`. At time t2, an +update to the same record changes `managed` to `false` for the same path. The blob was managed in +the expired slice but is unmanaged in the retained slice. The cleaner must decide: does the t1 +managed reference make the blob eligible for cleanup when t1 is expired, even though the t2 +reference says unmanaged? + +To safely handle this: If *any* retained reference to the same `external_path` exists (regardless of +the `managed` flag), the blob must not be deleted. The `managed` flag filters which blob references +*enter* the cleanup pipeline, but the liveness check must consider *all* references to a path, not +just managed ones. A transition from managed to unmanaged is effectively the user saying "Hudi +should stop managing this blob" -- the blob must survive the transition. + +The inverse case (unmanaged at t1, managed at t2) is straightforward: the blob is now managed and +subject to cleanup when t2's file slice eventually expires. --- @@ -94,23 +105,48 @@ Cleaning is a two-phase operation: ### Per-partition, per-file-group iteration -`CleanPlanner.getDeletePaths(partitionPath, earliestCommitToRetain)` iterates file groups within a -partition. For each file group, it compares file slices against the retention policy and produces a -list of `CleanFileInfo` objects (file paths to delete). The cleaner has no concept of cross-file-group -dependencies. +The cleaner iterates file groups within a partition. For each file group, it compares file slices +against the retention policy and produces a list of file paths to delete. The cleaner has no concept +of cross-file-group dependencies. ### Savepoint awareness The cleaner collects all savepointed timestamps and their associated data files. File slices that -overlap with savepointed files are excluded from cleaning -(`isFileSliceExistInSavepointedFiles`). This preserves the savepoint invariant: a savepoint freezes a -consistent snapshot including all data files it references. +overlap with savepointed files are excluded from cleaning. This preserves the savepoint invariant: a +savepoint freezes a consistent snapshot including all data files it references. ### OCC conflict resolution -`SimpleConcurrentFileWritesConflictResolutionStrategy` resolves write-write conflicts at the -`(partition, fileId)` granularity. There is no global serialization point. Concurrent writers to -different file groups proceed without contention. +Concurrent writer conflict resolution operates at the `(partition, fileId)` granularity. There is +no global serialization point. Concurrent writers to different file groups proceed without +contention. + +### MVCC: Cleaner does not conflict with writes + +Under Hudi's MVCC design, the cleaner and writers operate on non-overlapping sets of file slices. +Writers create new file slices; the cleaner deletes old (expired) ones. The cleaner only targets +file slices that are older than the retention boundary, which by definition are not being written to. +Consequently, the cleaner never conflicts with concurrent writers in the existing design -- there is +no write-clean contention on the same file slice. + +Blob cleanup must preserve this property: it must not introduce scenarios where the cleaner and a +writer contend over the same blob file. However, external blob files are **not** covered by MVCC -- +a writer's new file slice may reference an external blob that the cleaner is simultaneously +evaluating for deletion. This gap is a central problem for external blob cleanup. + +### Rollback vs. Restore + +Hudi has two distinct undo operations that affect blob liveness: + +1. **Rollback of an inflight commit:** Removes the effects of a single uncommitted write and reverts + the affected file groups to their previous state. Scope is limited to one commit. +2. **Table restore to a savepoint:** Reverts the table to a prior consistent snapshot, potentially + undoing multiple committed writes. Scope can affect many file groups across many partitions. + +Blob cleanup must handle both: rollback may remove the sole reference to a blob (making it orphaned) +or resurrect a previously-shadowed reference, while restore may revert the table to a state where +different blob references are live. These are distinct failure modes with different scopes and are +addressed separately in the constraints. ### Critical gap @@ -131,38 +167,38 @@ any constraint leads to data corruption, premature deletion, or permanent orphan Once a blob file is written, its content never changes. Blob files are append-once, read-many. This means a blob file's identity is stable for its entire lifetime. -*Source: RFC-100 blob cleaner design, general storage semantics.* +*Source: RFC-100, general storage semantics.* ### C2: Delete-and-re-add same path -A blob file can be deleted from storage and a new file created at the same path with different -content. This is a real concern for user-provided external blobs (the user controls the path). For -Hudi-created blobs, it is structurally eliminated by C11 (instant in path guarantees uniqueness). - -*Source: RFC-100 blob cleaner design; alternatives analysis constraint C2.* - -### C3: Cross-file-group blob sharing +A blob file can be deleted from external storage and a new file created at the same path with +different content. Since the user controls external blob paths, path reuse is a real concern. The +cleanup algorithm must not assume that two references to the same `reference.external_path` at +different times refer to the same physical content. -An out-of-line blob can be referenced by records in multiple file groups and multiple partitions. This -is explicitly supported for user-provided external blobs: two records in different file groups can -point to the same external file. For Hudi-created blobs, cross-FG sharing does not occur because the -blob is created within a specific file group's storage scope (see C11). However, after clustering -(C8), references to the same Hudi-created blob could temporarily exist in both the source and target -file groups until the source is cleaned. +**Concurrent writer caveat.** Consider two concurrent writers, A and B, both referencing external +blob X at the same path. Writer A commits first. Later, the cleaner evaluates A's expired slice and +marks blob X as a deletion candidate. Meanwhile, Writer B is still inflight and also references +blob X. If the cleaner deletes blob X before Writer B commits, Writer B's commit creates a dangling +reference. This race exists because external blob paths are not under Hudi's control -- unlike file +slices, the cleaner cannot rely on MVCC guarantees for external blob files. Any cleanup solution +must account for inflight writers that may reference the same external blob path being considered +for deletion. -*Source: RFC-100 lines 196-198 (Option 1 scans all active file slices); alternatives analysis F6.* +*Source: RFC-100; external blob storage semantics.* -### C4: Container files +### C3: Cross-file-group blob sharing -Multiple blobs can be packed into a single container file, distinguished by `(offset, length)` within -the BlobReference. A container file can only be deleted when **all** byte ranges within it are -unreferenced. If some ranges are orphaned but others are still live, the container cannot be deleted -- -it must be handed off to blob compaction for repacking. +An external blob can be referenced by records in multiple file groups and multiple partitions. This +is explicitly supported: two records in different file groups can point to the same +`reference.external_path`. Cross-file-group sharing is the **common case** for external blobs +(e.g., a shared media library where a popular video appears in multiple user playlists across +partitions). Any cleanup algorithm that assumes blobs are scoped to a single file group will produce +premature deletions. -*Source: BlobReference schema fields `offset` and `length`; RFC-100 lines 164-165 (container config); -alternatives analysis F1.* +*Source: RFC-100 lines 196-198 (Option 1 scans all active file slices).* -### C5: MOR log updates shadow base file blob refs +### C4: MOR log updates shadow base file blob refs In MOR tables, a log file update to a record's blob reference supersedes the base file's blob reference for that record. The base file's blob ref appears live (it exists in an active file slice) @@ -173,63 +209,78 @@ update is later rolled back. *Source: RFC-100 line 122 (merge mode determines which blob reference is returned); MOR semantics.* -### C6: Existing cleaner is per-file-group scoped +### C5: Existing cleaner is per-file-group scoped -`CleanPlanner` iterates per `HoodieFileGroup` within each partition. It determines expired file slices -within a single file group. There is no existing mechanism to evaluate cross-file-group dependencies -during cleaning. +The cleaner iterates per file group within each partition. It determines expired file slices within +a single file group. There is no existing mechanism to evaluate cross-file-group dependencies during +cleaning. -*Source: `CleanPlanner.getDeletePaths()`, `CleanPlanner.getFilesToCleanKeepingLatestCommits()`; -alternatives analysis F11.* +*Source: `CleanPlanner.getDeletePaths()`, per-file-group iteration in +`getFilesToCleanKeepingLatestCommits()`.* -### C7: OCC is per-file-group (no global contention allowed) +### C6: OCC is per-file-group (no global contention allowed) -Concurrent writer conflict resolution operates at `(partition, fileId)` granularity. Any solution that -introduces a global contention point (global counter, global lock, global bitmap) violates this +Concurrent writer conflict resolution operates at `(partition, fileId)` granularity. Any solution +that introduces a global contention point (global counter, global lock, global bitmap) violates this constraint and degrades write throughput under concurrency. -*Source: `SimpleConcurrentFileWritesConflictResolutionStrategy`; alternatives analysis F12.* +*Source: Concurrent writer conflict resolution strategy; per-file-group OCC semantics.* + +### C7: Replace commits move blob refs between file groups + +Several operations produce `replacecommit` actions that replace one set of file groups with another: + +- **Clustering:** Reads records from source file groups and rewrites them to target file groups. For + external blobs, clustering copies the pointer (same `reference.external_path`) to the target file + group. After clustering, the source file group's slices still reference the original external + blobs until those slices are cleaned. The target file group's slices reference the same external + blobs. + +- **insert_overwrite / insert_overwrite_table:** Replaces an entire partition (or table) with new + data. The replacement records may reference entirely different external blobs, or the same ones, + or a mix. The replaced file groups become eligible for cleaning after retention expires. -### C8: Clustering moves blob refs between file groups +In all replace commit scenarios, the key property is: after the replace, both the old (replaced) and +new (replacement) file groups may reference the same external blob. The cleaner must not delete an +external blob referenced by a replaced file group if the replacement file group (or any other active +file group) still references it. -Clustering reads records from source file groups and rewrites them to target file groups. For -Hudi-managed blobs, clustering creates **new** blob files in the target file group. For external -blobs, clustering copies the pointer (same path, same offset/length) to the target file group. After -clustering, the source file group's slices still reference the original blobs until those slices are -cleaned. The target file group's slices reference either new blobs (Hudi-managed) or the same -external blobs. +For external blobs, the append-vs-replace distinction does not affect blob identity. The cleaner's +question is always: "Is this `external_path` referenced by any active file slice anywhere in the +table?" Whether the reference arrived via an append, an upsert, a clustering copy, or an +insert_overwrite is irrelevant -- if any live reference exists, the blob must not be deleted. -*Source: RFC-100 lines 212-214.* +*Source: Hudi replace commit semantics; clustering and insert_overwrite operations.* -### C9: Savepoints freeze file slices and their blob refs +### C8: Savepoints freeze file slices and their blob refs A savepoint preserves a consistent snapshot. File slices covered by a savepoint are excluded from cleaning. This means any blob referenced by a savepointed file slice must also be preserved, even if the blob would otherwise be considered orphaned. The cleaner already handles savepoint exclusion for file slices; blob cleanup must extend this guarantee to the blobs they reference. -*Source: `CleanPlanner.savepointedTimestamps`, `isFileSliceExistInSavepointedFiles()`.* +*Source: Savepoint handling in CleanPlanner.* -### C10: Rollback can invalidate or resurrect references +### C9: Rollback and restore can invalidate or resurrect references -Rolling back a commit can remove file slices that were the sole reference to a blob (the blob becomes -orphaned). Conversely, rolling back a commit that updated a record's blob reference can resurrect the -previous reference (an older blob that appeared orphaned is now live again). Any blob cleanup solution -must account for both directions. +Two distinct undo operations affect blob liveness: -*Source: Hudi rollback semantics; timeline management.* +**Rollback of an inflight commit:** Can remove file slices that were the sole reference to a blob +(the blob becomes orphaned). Conversely, rolling back a commit that updated a record's blob +reference can resurrect the previous reference (an older blob that appeared orphaned is now live +again). -### C11: Hudi-created blob paths include instant (structurally unique) +**Table restore to a savepoint:** Can undo multiple committed writes simultaneously. All blob +references introduced after the restore point become orphaned. All blob references that were live at +the restore point are resurrected. The scope is broader than single-commit rollback: it may affect +many file groups across many partitions simultaneously. -Hudi-created blob files are stored at -`{table_path}/.hoodie/blobs/{partition}/{column_name}/{instant}/{blob_id}`. Because the commit -instant is embedded in the path, two different writes always produce different blob paths. This -eliminates the delete-and-re-add problem (C2) for Hudi-created blobs and means they are inherently -scoped to a single file group's write context. +Any blob cleanup solution must account for both directions (orphaning and resurrection) under both +operations. -*Source: RFC-100 line 170; alternatives analysis F3.* +*Source: Hudi rollback and restore semantics; timeline management.* -### C12: Archival removes commit metadata from active timeline +### C10: Archival removes commit metadata from active timeline Hudi's archival process moves completed commits from the active timeline to the archived timeline. If blob cleanup depends on information in commit metadata (e.g., which blobs were written by a @@ -237,18 +288,23 @@ commit), that information becomes unavailable after archival unless it is persis cleaner must either complete blob reference resolution before archival, or ensure the necessary information survives archival. -*Source: Hudi archival semantics; `HoodieActiveTimeline` vs `HoodieArchivedTimeline`.* +Note: Storing blob reference sets within commit metadata would compound this problem -- commit +metadata grows with the number of blob references, and archival would either lose or have to +specially preserve this information. This is an additional reason not to use commit metadata as the +source of truth for blob reference liveness. -### C13: Non-path-dispatched blobs require cross-FG verification at scale +*Source: Hudi archival semantics.* -For the non-path-dispatched flow (Flow 2), cross-file-group blob sharing (C3) is the **common -case**, not an edge case. Users referencing external blobs (e.g., a shared media library) will -frequently have multiple records across different file groups and partitions pointing to the same -blob file. Any cleanup algorithm that treats cross-FG verification as a rare fallback will impose -disproportionate cost on Flow 2 workloads. The cross-FG verification path must be designed for -volume, not just correctness. +### C11: External blobs require cross-file-group verification at scale -*Source: Two entry flows (Section 2); C3; alternatives analysis D1, D3.* +For external blobs, cross-file-group blob sharing (C3) is the **common case**, not an edge case. +Users referencing external blobs (e.g., a shared media library) will frequently have multiple +records across different file groups and partitions pointing to the same blob file. Any cleanup +algorithm that treats cross-FG verification as a rare fallback will impose disproportionate cost on +external blob workloads. The cross-FG verification path must be designed for volume, not just +correctness. + +*Source: C3 (cross-FG sharing is common for external blobs).* --- @@ -266,63 +322,52 @@ Every orphaned blob must eventually be cleaned. The number of cleanup cycles req orphan must be bounded (e.g., cleaned within N cleaner invocations after the last referencing file slice is expired). Unbounded accumulation of orphaned blobs wastes storage indefinitely. -### R3: Container awareness (range-level liveness) - -Cleanup must track liveness at the `(path, offset, length)` tuple level, not just the path level. -A container file may have some live ranges and some dead ranges. Only when all ranges are dead can the -container file be deleted. Partially-dead containers should be flagged for blob compaction. - -### R4: MOR correctness +### R3: MOR correctness For MOR tables, blob cleanup must be safe in the presence of log updates that shadow base file blob references. Over-retention (keeping a shadowed blob until post-compaction) is acceptable. Under-retention (prematurely deleting a blob whose reference appears shadowed but could be resurrected by rollback) is not. -### R5: Concurrency safety (no global serialization) +### R4: Concurrency safety (no global serialization) Blob cleanup must not introduce global contention points. Write throughput for tables with blobs must -not degrade compared to tables without blobs under concurrent writers. Per-file-group scoping (C7) +not degrade compared to tables without blobs under concurrent writers. Per-file-group scoping (C6) must be preserved. -### R6: Scale proportional to work, not table size +### R5: Scale proportional to work, not table size -For path-dispatched blobs (Flow 1): the cost of blob cleanup must be proportional to the number of -file groups being cleaned, not the total table size. A table with 100K file groups cleaning 1K of -them must not scan all 100K file groups. +Cross-FG verification is required for external blobs (C11), but the cost must be proportional to the +number of **candidate blobs requiring verification**, not the total number of active file slices in +the table. A table with 100K file groups where 50 external blob candidates need cross-FG +verification must not scan all 100K file groups -- it must use targeted lookups or indexes to resolve +those 50 candidates efficiently. -For non-path-dispatched blobs (Flow 2): cross-FG verification is required (C13), but the cost must -be proportional to the number of **candidate blobs requiring verification**, not the total number of -active file slices in the table. A table with 100K file groups where 50 external blob candidates -need cross-FG verification must not scan all 100K file groups -- it must use targeted lookups or -indexes to resolve those 50 candidates efficiently. - -### R7: No cost for non-blob tables +### R6: No cost for non-blob tables Tables without blob columns must pay zero additional cost. The blob cleanup path must not be entered if no blob columns exist. This includes no additional metadata, no additional timeline entries, and no additional I/O. -### R8: All cleaning policies supported +### R7: All cleaning policies supported Blob cleanup must work correctly under all three cleaning policies: `KEEP_LATEST_COMMITS`, `KEEP_LATEST_FILE_VERSIONS`, and `KEEP_LATEST_BY_HOURS`. The blob cleanup logic should be policy-agnostic -- it operates on the set of expired vs. retained file slices determined by the policy, not on the policy itself. -### R9: Crash safety and idempotency +### R8: Crash safety and idempotency If the cleaner crashes after planning but before completing all deletions, restarting must be safe. Blob deletions must be idempotent (deleting an already-deleted file is a no-op, not an error). The cleaner plan must include enough information to resume blob cleanup without re-reading expired file slices (which may no longer exist after a partial execution). -### R10: Observability +### R9: Observability Blob cleanup must report metrics: number of blob files deleted, number of blob files retained -(over-retained due to MOR or containers), number of container files flagged for blob compaction, -and total storage reclaimed. These metrics enable operators to understand blob storage growth and -cleanup effectiveness. +(over-retained due to MOR), and total storage reclaimed. These metrics enable operators to understand +blob storage growth and cleanup effectiveness. --- @@ -333,23 +378,23 @@ make the constraints and requirements concrete. ### Example 1: Cross-file-group sharing -- per-FG cleanup deletes shared blob -**Demonstrates:** C3, C6, R1 +**Demonstrates:** C3, C5, R1 ``` Setup: Partition P1, File Group FG-1: - Slice @t1: row1.blob_ref = (s3://shared/video.mp4, 0, 10MB, managed=true) + Slice @t1: row1.blob_ref = {external_path: "s3://shared/video.mp4", managed: true} Partition P2, File Group FG-2: - Slice @t1: row2.blob_ref = (s3://shared/video.mp4, 0, 10MB, managed=true) + Slice @t1: row2.blob_ref = {external_path: "s3://shared/video.mp4", managed: true} Action: Cleaner expires FG-1's slice @t1 (no retained slices in FG-1). Per-FG cleanup (incorrect): - FG-1 expired refs = {(s3://shared/video.mp4, 0, 10MB)} + FG-1 expired refs = {"s3://shared/video.mp4"} FG-1 retained refs = {} - Orphaned within FG-1 = {(s3://shared/video.mp4, 0, 10MB)} + Orphaned within FG-1 = {"s3://shared/video.mp4"} -> DELETE s3://shared/video.mp4 Result: @@ -358,58 +403,27 @@ Result: Correct behavior: Before deleting, verify that no other active file slice in any file group - references (s3://shared/video.mp4, 0, 10MB). FG-2's active slice references - it, so the blob must be retained. -``` - -### Example 2: Container file partial liveness -- deleting container destroys live ranges - -**Demonstrates:** C4, R3 - -``` -Setup: - File Group FG-1: - Slice @t1: row1.blob_ref = (container_A.bin, 0, 1MB, managed=true) - Slice @t2: row2.blob_ref = (container_A.bin, 1MB, 2MB, managed=true) - -Action: - Cleaner expires slice @t1, retains slice @t2. - -Path-level cleanup (incorrect): - Expired paths = {container_A.bin} - Retained paths = {container_A.bin} - container_A.bin is in both sets -> retain. - (This happens to be safe by accident, but only because the same path appears.) - - Alternative scenario -- different slices, same container: - Slice @t1 in FG-1: row1.blob_ref = (container_A.bin, 0, 1MB) -- expired - Slice @t1 in FG-2: row2.blob_ref = (container_A.bin, 1MB, 2MB) -- retained - - If FG-1 concludes container_A.bin is orphaned (no retained refs within FG-1): - -> DELETE container_A.bin - FG-2's live range at offset 1MB is destroyed. - -Correct behavior: - Track liveness at (path, offset, length). Only delete the container file when - ALL ranges are unreferenced. If some ranges are dead and others live, flag the - container for blob compaction instead of deleting it. + references s3://shared/video.mp4. FG-2's active slice references it, so + the blob must be retained. ``` -### Example 3: Delete-and-re-add -- path reuse causes identity confusion +### Example 2: Delete-and-re-add -- path reuse causes identity confusion **Demonstrates:** C2, R1 ``` Setup: - At time t1: User writes row1.blob_ref = (s3://user/photo.jpg, managed=true) + At time t1: User writes row1 with + blob_ref = {external_path: "s3://user/photo.jpg", managed: true} At time t2: User deletes the file at s3://user/photo.jpg externally - At time t3: User writes row2.blob_ref = (s3://user/photo.jpg, managed=true) - (new file at same path, different content) + At time t3: User writes row2 with + blob_ref = {external_path: "s3://user/photo.jpg", managed: true} + (new file at same path, different content) Cleanup scenario: Cleaner expires slice @t1. Slice @t3 is retained. - Expired refs = {s3://user/photo.jpg} - Retained refs = {s3://user/photo.jpg} + Expired refs = {"s3://user/photo.jpg"} + Retained refs = {"s3://user/photo.jpg"} Same path in both sets -> retain. (Correct by coincidence.) But consider: if the cleaner had cached blob identity by path and assumed @@ -421,36 +435,43 @@ Cleanup scenario: AGAIN (third incarnation), the cleaner's identity model must not confuse the three incarnations. -Note: This problem does not arise for Hudi-created blobs (C11 -- instant in -path guarantees each write produces a unique path). +Concurrent writer scenario (C2 caveat): + Writer A commits at t1, referencing s3://user/photo.jpg (managed=true). + Writer B starts at t0, also referencing s3://user/photo.jpg (managed=true). + Cleaner expires A's slice at t1. B is still inflight. + Cleaner deletes s3://user/photo.jpg. + Writer B commits -> dangling reference. Data corruption. + + This race is specific to external blobs: the cleaner cannot rely on MVCC + guarantees for external blob files that are outside Hudi's control. ``` -### Example 4: MOR log shadow -- base file ref appears live when superseded +### Example 3: MOR log shadow -- base file ref appears live when superseded -**Demonstrates:** C5, R4 +**Demonstrates:** C4, R3 ``` Setup (MOR table): File Group FG-1: - Base file @t1: row1.blob_ref = (blob_A.bin, managed=true) - Log file @t2: row1.blob_ref = (blob_B.bin, managed=true) -- update + Base file @t1: row1.blob_ref = {external_path: "s3://ext/blob_A.bin", managed: true} + Log file @t2: row1.blob_ref = {external_path: "s3://ext/blob_B.bin", managed: true} - After merge: row1's effective blob_ref is blob_B.bin. + After merge: row1's effective blob_ref points to blob_B.bin. blob_A.bin is no longer referenced by any live record. Cleanup scenario (pre-compaction): Cleaner does not expire slice @t1 (it's retained). Reading blob refs from the retained slice: - Base @t1: {blob_A.bin} - Log @t2: {blob_B.bin} - Union: {blob_A.bin, blob_B.bin} + Base @t1: {"s3://ext/blob_A.bin"} + Log @t2: {"s3://ext/blob_B.bin"} + Union: {"s3://ext/blob_A.bin", "s3://ext/blob_B.bin"} blob_A.bin appears live (it's in the retained set) even though it's been superseded by the log update. This is over-retention -- safe but wasteful. After compaction: - Compacted base @t3: row1.blob_ref = (blob_B.bin, managed=true) - Now the only retained ref is {blob_B.bin}. + Compacted base @t3: row1.blob_ref = {external_path: "s3://ext/blob_B.bin", managed: true} + Now the only retained ref is {"s3://ext/blob_B.bin"}. blob_A.bin is no longer in any retained set -> eligible for deletion. Why over-retention is the correct default: @@ -459,15 +480,21 @@ Why over-retention is the correct default: dangling reference. Over-retention prevents this. ``` -### Example 5: Writer-cleaner race -- three scenarios +### Example 4: Writer-cleaner race -- three scenarios -**Demonstrates:** C7, R1, R5 +**Demonstrates:** C6, R1, R4 ``` A writer and cleaner operate concurrently on the same table. +Note: Under Hudi's MVCC design, the cleaner and writers operate on non-overlapping +file slices -- the cleaner never conflicts with writers on file slice operations. +However, external blob files are NOT covered by MVCC: a writer's new file slice may +reference an external blob that the cleaner is simultaneously evaluating for deletion. +The following scenarios illustrate this gap. + Scenario A: Writer commits BEFORE cleaner's timeline fence - t1: Writer starts, references blob_X + t1: Writer starts, references blob_X (external) t2: Writer commits (blob_X is now in a retained slice) t3: Cleaner plans cleanup t4: Cleaner checks timeline fence -- sees writer's commit at t2 @@ -477,7 +504,7 @@ Scenario A: Writer commits BEFORE cleaner's timeline fence Scenario B: Writer commits AFTER cleaner's timeline fence, BEFORE delete t1: Cleaner plans cleanup, blob_X is a candidate for deletion t2: Cleaner checks timeline fence -- no new commits - t3: Writer commits, referencing blob_X + t3: Writer commits, referencing blob_X (external) t4: Cleaner deletes blob_X -> UNSAFE. The timeline fence did not see the writer's commit. blob_X is deleted, but the writer's new slice references it. @@ -488,89 +515,83 @@ Scenario C: Writer commits AFTER cleaner deletes -> UNSAFE. The blob is already gone. The writer's commit creates a dangling reference. -Note: Scenario B and C cannot occur for Hudi-created blobs (C11 + D2 from -alternatives analysis: new writes always produce new paths, and UPSERT carries -forward refs from retained slices). They are real concerns for user-provided -external blobs where the user can reference any path. +Scenarios B and C are real concerns for external blobs where the user can +reference any path. Any solution must close this gap -- for example, via a +writer-side conflict check at commit time. ``` -### Example 6: Clustering moves refs -- replaced FG appears to have no retained slices +### Example 5: Replace commits move refs -- replaced FG appears to have no retained slices -**Demonstrates:** C8, C3, R1 +**Demonstrates:** C7, C3, R1 ``` Setup: File Group FG-1: - Slice @t1: row1.blob_ref = (blob_A.bin, managed=true) + Slice @t1: row1.blob_ref = {external_path: "s3://ext/video.mp4", managed: true} Clustering at t2 rewrites FG-1's records to FG-2: File Group FG-2: - Slice @t2: row1.blob_ref = (blob_A_new.bin, managed=true) - (Hudi-managed: new blob created in FG-2's scope) + Slice @t2: row1.blob_ref = {external_path: "s3://ext/video.mp4", managed: true} + (external: pointer copied, same blob) FG-1 is now a replaced file group. Its slice @t1 is eligible for cleaning after the retention policy expires. -Cleanup: +Cleanup (incorrect per-FG only): Cleaner cleans FG-1's slice @t1. - Expired refs = {blob_A.bin} + Expired refs = {"s3://ext/video.mp4"} Retained refs within FG-1 = {} (FG-1 has no retained slices -- it's replaced) - blob_A.bin appears orphaned within FG-1 -> DELETE - - This is actually CORRECT for Hudi-managed blobs: clustering created a new - blob (blob_A_new.bin) in FG-2, so blob_A.bin is genuinely orphaned. - - BUT if the blob were an external user-provided blob (same path referenced - from both FG-1 and FG-2 after clustering): + s3://ext/video.mp4 appears orphaned within FG-1 -> DELETE - File Group FG-2: - Slice @t2: row1.blob_ref = (s3://ext/video.mp4, managed=true) - (external: pointer copied, same blob) + FG-2's live reference is destroyed. Data corruption. - FG-1's cleanup concludes s3://ext/video.mp4 is orphaned within FG-1. - Deleting it destroys FG-2's live reference. +Alternative scenario -- insert_overwrite: + At t2, an insert_overwrite replaces partition P1 with new data in FG-3. + FG-3's records also reference s3://ext/video.mp4 (same external blob). + FG-1 is replaced. Same per-FG cleanup error: FG-1 concludes the blob is + orphaned, but FG-3 still references it. Correct behavior: - For Hudi-managed blobs, per-FG cleanup is safe because clustering always - creates new blob files in the target FG. For external blobs, a cross-FG - check is required before deletion. + For external blobs, a cross-FG check is required before deletion, regardless + of which replace commit type (clustering, insert_overwrite, insert_overwrite_table) + created the replacement. Per-FG cleanup alone is never sufficient for external blobs. ``` -### Example 7: Non-bootstrapped external blobs at scale -- cross-FG verification is the common path +### Example 6: External blobs at scale -- cross-FG verification is the common path -**Demonstrates:** C3, C13, R6 (Flow 2) +**Demonstrates:** C3, C11, R5 ``` Setup: A media company stores 10M video files in s3://media-library/. They create a Hudi table with a blob column referencing these videos. - They do NOT bootstrap -- Hudi manages refs, not storage layout. + Hudi manages refs, not storage layout. The table has 50K file groups across 1K partitions. Many videos are referenced by multiple records (e.g., a popular video appears in multiple user playlists across different partitions). Partition users/alice, FG-101: - Slice @t1: row1.blob_ref = (s3://media-library/video_X.mp4, managed=true) + Slice @t1: row1.blob_ref = {external_path: "s3://media-library/video_X.mp4", managed: true} Partition users/bob, FG-202: - Slice @t1: row2.blob_ref = (s3://media-library/video_X.mp4, managed=true) + Slice @t1: row2.blob_ref = {external_path: "s3://media-library/video_X.mp4", managed: true} Partition users/carol, FG-303: - Slice @t1: row3.blob_ref = (s3://media-library/video_X.mp4, managed=true) + Slice @t1: row3.blob_ref = {external_path: "s3://media-library/video_X.mp4", managed: true} Action: Cleaner expires FG-101's slice @t1 (alice deleted her playlist entry). Naive per-FG cleanup (incorrect): - FG-101 expired refs = {(s3://media-library/video_X.mp4, 0, 50MB)} + FG-101 expired refs = {"s3://media-library/video_X.mp4"} FG-101 retained refs = {} Orphaned within FG-101 -> DELETE video_X.mp4 Bob and Carol lose their video. Data corruption. Naive full-table scan (correct but expensive): To verify video_X.mp4 is safe to delete, scan ALL 50K file groups - for references. This is correct but violates R6 -- the cost is + for references. This is correct but violates R5 -- the cost is proportional to table size, not to the number of candidates. Scale concern: @@ -587,39 +608,41 @@ Correct behavior: element, not a fallback path. ``` -### Example 8: MOR log-chain transient blob -- introduced and superseded within logs +### Example 7: MOR log-chain transient blob -- introduced and superseded within logs -**Demonstrates:** C5, R2 +**Demonstrates:** C4, R2 ``` Setup (MOR table): File Group FG-1: - Base file @t1: row1.blob_ref = (blob_A.bin, managed=true) - Log file @t2: row1.blob_ref = (blob_B.bin, managed=true) -- update - Log file @t3: row1.blob_ref = (blob_C.bin, managed=true) -- another update + Base file @t1: row1.blob_ref = {external_path: "s3://ext/blob_A.bin", managed: true} + Log file @t2: row1.blob_ref = {external_path: "s3://ext/blob_B.bin", managed: true} + Log file @t3: row1.blob_ref = {external_path: "s3://ext/blob_C.bin", managed: true} - After merge: row1's effective blob_ref is blob_C.bin. + After merge: row1's effective blob_ref points to blob_C.bin. blob_B.bin was introduced at t2 and superseded at t3 -- it exists ONLY in log @t2. After compaction @t4: - Compacted base @t4: row1.blob_ref = (blob_C.bin, managed=true) + Compacted base @t4: row1.blob_ref = {external_path: "s3://ext/blob_C.bin", managed: true} The pre-compaction slice (base @t1 + logs @t2, @t3) is now expired. Cleanup scenario: Cleaner expires the pre-compaction slice. - Retained slice = compacted base @t4, refs = {blob_C.bin}. + Retained slice = compacted base @t4, refs = {"s3://ext/blob_C.bin"}. If expired slice reads only the base file: - expired_refs = {blob_A.bin} (from base @t1) - local_orphans = {blob_A.bin} - {blob_C.bin} = {blob_A.bin} + expired_refs = {"s3://ext/blob_A.bin"} (from base @t1) + local_orphans = {"s3://ext/blob_A.bin"} - {"s3://ext/blob_C.bin"} + = {"s3://ext/blob_A.bin"} blob_A.bin is correctly identified as orphaned. blob_B.bin is MISSED -- it exists only in expired log @t2. blob_B.bin becomes a permanent orphan (R2 violation). If expired slice reads base + log files: - expired_refs = {blob_A.bin, blob_B.bin, blob_C.bin} - local_orphans = {blob_A.bin, blob_B.bin, blob_C.bin} - {blob_C.bin} - = {blob_A.bin, blob_B.bin} + expired_refs = {"s3://ext/blob_A.bin", "s3://ext/blob_B.bin", "s3://ext/blob_C.bin"} + local_orphans = {"s3://ext/blob_A.bin", "s3://ext/blob_B.bin", "s3://ext/blob_C.bin"} + - {"s3://ext/blob_C.bin"} + = {"s3://ext/blob_A.bin", "s3://ext/blob_B.bin"} Both orphaned blobs are correctly identified and deleted. Why this matters: @@ -706,23 +729,24 @@ These questions must be answered by any solution design. They are not prescripti answers exist for each. **Q1: What is blob identity?** -How does the cleanup algorithm identify a specific blob? By path alone? By the tuple -`(path, offset, length)`? By `(path, generation/version)`? The identity model determines how -deduplication, container handling, and delete-and-re-add (C2) are handled. +How does the cleanup algorithm identify a specific blob? By `reference.external_path` alone? For +external blobs where container files are out of scope, path-based identity may be sufficient. The +identity model determines how deduplication and delete-and-re-add (C2) are handled. **Q2: Where is liveness computed?** Is the set of live blob references computed at write time (incremental), at clean time (batch), or some combination? Write-time computation amortizes cost but requires additional metadata storage. -Clean-time computation avoids write overhead but may be expensive at scale. +Clean-time computation avoids write overhead but may be expensive at scale. Note: storing liveness +data within commit metadata is not a viable option -- it does not scale (see C10). **Q3: What is the unit of cleanup planning?** Does blob cleanup plan per-file-group (aligned with the existing cleaner), per-partition, or globally? -Per-FG is naturally aligned with OCC (C7) but cannot handle cross-FG sharing (C3) without extension. -Global planning handles cross-FG sharing but risks violating C7. +Per-FG is naturally aligned with OCC (C6) but cannot handle cross-FG sharing (C3) without extension. +Global planning handles cross-FG sharing but risks violating C6. **Q4: How does blob cleanup interact with archival?** If the cleanup algorithm depends on commit metadata to determine which blobs were written, what -happens when those commits are archived (C12)? Must blob cleanup complete before archival? Must the +happens when those commits are archived (C10)? Must blob cleanup complete before archival? Must the relevant metadata be persisted outside the active timeline? **Q5: Extension or separate service?** @@ -736,10 +760,22 @@ Despite best efforts, what happens if a blob is prematurely deleted? Is there a (query-time error surfacing)? Is there a recovery path (rebuild from an external source)? How does the system distinguish "blob correctly not present" from "blob incorrectly deleted"? -**Q7: How does cross-FG verification scale for non-path-dispatched blobs?** -For Flow 2 workloads where cross-FG sharing is common, what mechanism makes cross-FG verification -efficient? Options include: an MDT index mapping blob paths to referencing file groups, predicate -pushdown on the blob ref column during targeted scans, a reference count maintained at write time, -or a bloom filter index. The chosen mechanism must satisfy R6 (cost proportional to candidates, not -table size) and C7 (no global serialization). How does this mechanism interact with writes, and what -is its maintenance cost? +**Q7: How does cross-FG verification scale for external blobs?** +For external blob workloads where cross-FG sharing is common, what mechanism makes cross-FG +verification efficient? Options include: an MDT index mapping blob paths to referencing file groups, +predicate pushdown on the blob ref column during targeted scans, a reference count maintained at +write time, or a bloom filter index. The chosen mechanism must satisfy R5 (cost proportional to +candidates, not table size) and C6 (no global serialization). How does this mechanism interact with +writes, and what is its maintenance cost? + +**Q8: How should the cleaner handle managed-to-unmanaged transitions?** +If a blob reference transitions from `reference.managed = true` to `reference.managed = false` +across writes (or vice versa), what is the cleaner's behavior? Should the `managed` flag be +evaluated at the time of the expired reference, the retained reference, or both? See the managed +flag discussion in Section 2. + +**Q9: How does blob cleanup interact with table restore?** +Table restore (as distinct from single-commit rollback) can undo multiple committed writes, +potentially orphaning many blobs at once and resurrecting others. Does the cleaner need special +handling for post-restore cleanup, or does the standard cleanup algorithm handle it naturally? What +if a restore occurs after a cleaner run has already deleted blobs that the restored state references? From 6dc90a8948a28b5f4b7f60ae8273a7a34c2f0ce0 Mon Sep 17 00:00:00 2001 From: voon Date: Mon, 6 Apr 2026 16:38:35 +0800 Subject: [PATCH 6/6] Update RFC to include sidecar parquet file to contain blob delete files --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 165 +++++++++++++++------ 1 file changed, 122 insertions(+), 43 deletions(-) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index 7ae9335730859..c8f1ea0fc3ee7 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -133,6 +133,7 @@ with the number of candidates, not the table size. | Blob identity | `reference.external_path` | Path-based identity for external blobs | | Cleanup scope | Per-FG candidate identification + cross-FG verification | Aligns with OCC (C6) and existing cleaner (C5); scales for C11 | | Cross-FG mechanism | MDT secondary index on `reference.external_path` | Short-circuits on first non-cleaned FG ref | +| Blob delete storage | Sidecar Parquet file (`.hoodie/.aux/clean/`) | Avoids plan bloat; durable artifact for writer conflict checks | | MOR strategy | Over-retain (union of base + log refs) | Safe (C4, R3); cleaned after compaction | ```mermaid @@ -151,24 +152,27 @@ flowchart LR end S1 --> S2["Stage 2
Cross-FG verification
(MDT secondary index)"] + S2 --> SC["Write sidecar Parquet
.hoodie/.aux/clean/<instant>
.blob_deletes.parquet"] end subgraph Plan["HoodieCleanerPlan"] FP["filePathsToBeDeleted
(existing)"] - BP["blobFilesToDelete
(new)"] + EM["extraMetadata[blobDeletesPath]
(pointer to sidecar)"] end - S2 --> BP + SC --> EM CP --> FP subgraph Execution["CleanActionExecutor.runClean()"] direction TB + RS["Read sidecar Parquet"] DF["Delete file slices
(existing, parallel)"] DB["Delete blob files
(new, parallel)"] + RS --> DB end FP --> DF - BP --> DB + EM --> RS ``` --- @@ -362,14 +366,16 @@ sequenceDiagram │ └── If hasBlobColumns: Stage 1 per FG ├── CleanPlanner: replaced file groups -> Stage 1 ├── If local orphan candidates non-empty: Stage 2 - ├── Build HoodieCleanerPlan (+ blobFilesToDelete) + ├── Write sidecar Parquet to .hoodie/.aux/clean/.blob_deletes.parquet + ├── Build HoodieCleanerPlan with extraMetadata["blobDeletesPath"] └── Persist plan to timeline (REQUESTED state) 2. CleanActionExecutor.runClean() ├── Transition to INFLIGHT + ├── Read sidecar Parquet (blob delete list) ├── Delete file slices (existing, parallelized) - ├── Delete blob files (new, same parallelized pattern) - ├── Build HoodieCleanMetadata with blobCleanStats + ├── Delete blob files (new, parallelized) // parallel with file slice deletes + ├── Build HoodieCleanMetadata with blobCleanStats + blobDeletesSidecarPath └── Transition to COMPLETED ``` @@ -377,6 +383,7 @@ sequenceDiagram sequenceDiagram participant P as CleanPlanActionExecutor participant TL as Timeline + participant AUX as .hoodie/.aux/clean/ participant E as CleanActionExecutor participant S as Storage @@ -384,28 +391,31 @@ sequenceDiagram P->>P: Stage 1: per-FG blob ref set difference P->>P: Stage 2: MDT index lookup (if candidates exist) - P->>TL: Persist HoodieCleanerPlan + P->>AUX: Write sidecar Parquet (blob delete list) + P->>TL: Persist HoodieCleanerPlan
(extraMetadata["blobDeletesPath"]) Note over TL: REQUESTED rect rgb(255, 245, 230) - Note right of TL: Crash here → restart fresh
(no plan persisted yet) + Note right of TL: Crash before plan → orphaned sidecar
(harmless, cleaned at next startup) end E->>TL: Transition plan state Note over TL: INFLIGHT + E->>AUX: Read sidecar Parquet + rect rgb(255, 245, 230) - Note right of TL: Crash here → re-execute plan
(idempotent: FileNotFound = success) + Note right of TL: Crash here → re-read sidecar,
re-delete (FileNotFound = success) end par Parallel deletion E->>S: Delete file slices (existing) and - E->>S: Delete blob files (new) + E->>S: Delete blob files (from sidecar) end - E->>E: Build HoodieCleanMetadata
(+ blobCleanStats) + E->>E: Build HoodieCleanMetadata
(blobCleanStats + sidecarPath) E->>TL: Transition plan state rect rgb(255, 245, 230) @@ -413,6 +423,7 @@ sequenceDiagram end Note over TL: COMPLETED + Note over AUX: Sidecar lives until archival
(for writer conflict checks) ``` --- @@ -448,10 +459,46 @@ produces `FileGroupCleanResult` objects with `retainedSlices = empty` and ### Schema Changes: HoodieCleanerPlan -One new nullable field with null default (backward compatible): +**No new fields.** The blob delete list is stored in a sidecar Parquet file, not in the plan. The +plan references the sidecar via the existing `extraMetadata` map: + +- `extraMetadata["blobDeletesPath"]` -- path to the sidecar Parquet file + (e.g., `.hoodie/.aux/clean/20240101120000.blob_deletes.parquet`) + +This avoids plan bloat regardless of the number of blob candidates. The `extraMetadata` map is +already part of the `HoodieCleanerPlan` Avro schema -- no schema change is needed. + +### Sidecar Parquet File: Blob Delete List + +The blob delete list is stored as a sidecar Parquet file at +`.hoodie/.aux/clean/.blob_deletes.parquet`. This file is the **single source of truth** +for blob delete candidates across all clean states (REQUESTED, INFLIGHT, COMPLETED). + +**Schema:** +``` +message BlobDeleteList { + required binary external_path (STRING); +} +``` + +**Write sequence:** The sidecar is written **before** the plan is persisted to the timeline. +By the time the plan is visible, the sidecar is already durable on storage. This ensures +atomicity: if a writer sees the plan, the sidecar is guaranteed to exist. -- **`blobFilesToDelete`**: `List` -- external blob files confirmed as - globally orphaned. The executor deletes these. +**Lifecycle:** The sidecar lives until the clean instant is **archived**. This covers writer +conflict checks against REQUESTED, INFLIGHT, and COMPLETED clean actions. When the instant is +archived, the sidecar is deleted as part of archival cleanup. + +**Orphan cleanup:** If a crash occurs after writing the sidecar but before persisting the plan, +the sidecar is orphaned. At cleaner startup, a lightweight cleanup routine lists +`.hoodie/.aux/clean/` and deletes any sidecar whose instant has no corresponding plan on the +timeline. + +**Rollback:** When a clean plan is rolled back, rollback logic reads +`extraMetadata["blobDeletesPath"]` and deletes the sidecar. + +**Size:** Dictionary encoding on shared path prefixes (e.g., `s3://media-bucket/videos/...`) +provides good compression. 10K blob paths ≈ a few hundred KB. ### Schema Changes: HoodieCleanMetadata @@ -459,7 +506,8 @@ A new nullable field `blobCleanStats` of type `HoodieBlobCleanStats`: - `totalBlobFilesDeleted`, `totalBlobFilesRetained` - `totalBlobStorageReclaimed` -- `deletedBlobFilePaths`, `failedBlobFilePaths` +- `blobDeletesSidecarPath` -- pointer to the sidecar (for COMPLETED-state conflict checks) +- `failedBlobFilePaths` -- only failures (expected to be empty or near-empty) ### hasBlobColumns() Gate @@ -485,31 +533,47 @@ snapshot and its actual file deletion is closed by a commit-time conflict check: 2. At commit time (in `preCommit()`, under the existing transaction lock), the writer checks all three clean states -- COMPLETED, INFLIGHT, and REQUESTED -- because a REQUESTED plan can begin executing at any moment (the REQUESTED->INFLIGHT transition doesn't acquire the transaction - lock). It checks `deletedBlobFilePaths` (COMPLETED) and `blobFilesToDelete` (INFLIGHT/REQUESTED). -3. If any overlap is found, the commit is rejected with `HoodieWriteConflictException` and the + lock). +3. For each clean instant, the writer locates the sidecar Parquet file: + - REQUESTED/INFLIGHT: reads `extraMetadata["blobDeletesPath"]` from the plan + - COMPLETED: reads `blobDeletesSidecarPath` from `HoodieCleanMetadata` +4. The writer reads the sidecar and checks intersection with its external blob paths. +5. If any overlap is found, the commit is rejected with `HoodieWriteConflictException` and the writer retries. -Cost is zero for non-blob tables. For external blobs: one timeline scan + 1-3 metadata reads. +The sidecar is the single source of truth across all three states -- no blob paths are stored +inline in the plan or completed metadata. + +**Note:** `ConcurrentOperation` and `TransactionUtils.getInflightAndRequestedInstants()` currently +exclude `CLEAN_ACTION`. Adding external blob cleanup requires extending conflict resolution to +include clean actions when blob columns exist. + +Cost is zero for non-blob tables. For external blobs: one timeline scan + 1-3 sidecar Parquet +reads (~50-200ms each on cloud storage), gated on the writer having external blob paths. ```mermaid sequenceDiagram participant W as Writer participant TL as Timeline + participant AUX as Sidecar (.aux) participant CL as Cleaner Note over W,CL: Scenario A: Writer commits BEFORE cleaner plans W->>TL: Commit (references blob_X) - CL->>TL: Plan cleanup + CL->>AUX: Write sidecar (blob_X in delete list) + CL->>TL: Plan cleanup (extraMetadata -> sidecar path) Note right of CL: Sees blob_X in retained slice → not deleted Note over W,CL: ✓ Safe Note over W,CL: Scenario B: Writer commits AFTER cleaner plans, BEFORE delete - CL->>TL: Plan cleanup (blob_X in blobFilesToDelete) + CL->>AUX: Write sidecar (blob_X in delete list) + CL->>TL: Plan cleanup Note over TL: REQUESTED / INFLIGHT - W->>TL: preCommit() -- reads clean plan - Note left of W: Intersection found!
HoodieWriteConflictException
→ Writer retries + W->>TL: preCommit() -- reads plan, finds sidecar path + W->>AUX: Read sidecar Parquet + Note left of W: blob_X in sidecar
→ HoodieWriteConflictException
→ Writer retries Note over W,CL: ✓ Safe -- conflict detected Note over W,CL: Scenario C: Writer commits AFTER cleaner deletes, BEFORE COMPLETED @@ -517,14 +581,16 @@ sequenceDiagram CL->>CL: Delete blob_X from storage Note over TL: Still INFLIGHT W->>TL: preCommit() -- reads INFLIGHT plan - Note left of W: blob_X in blobFilesToDelete
→ Rejection + W->>AUX: Read sidecar Parquet + Note left of W: blob_X in sidecar → Rejection Note over W,CL: ✓ Safe -- same as B Note over W,CL: Scenario D: Cleaner completes, THEN writer acquires lock - CL->>TL: Transition to COMPLETED + CL->>TL: Transition to COMPLETED (metadata has sidecar path) W->>TL: preCommit() -- reads COMPLETED metadata - Note left of W: blob_X in deletedBlobFilePaths
→ Rejection + W->>AUX: Read sidecar Parquet + Note left of W: blob_X in sidecar → Rejection Note over W,CL: ✓ Safe ``` @@ -545,14 +611,15 @@ sequenceDiagram ### Crash Recovery Crash recovery is idempotent by construction, using the same mechanisms as existing file slice -cleaning: +cleaning. The sidecar Parquet file ensures the blob delete list survives crashes: -| Crash point | Recovery | -|-----------------------------------------|--------------------------------------------------------------------------------------------------------| -| During planning (before plan persisted) | No REQUESTED instant on timeline. Cleaner starts fresh. | -| After plan persisted, before execution | REQUESTED instant found; plan re-read and executed. | -| During execution (partial deletes) | INFLIGHT instant re-executed. Already-deleted files return FileNotFoundException -> treated as success. | -| After execution, before COMPLETED | INFLIGHT re-executed. All deletes are no-ops. Metadata written, instant transitions to COMPLETED. | +| Crash point | Recovery | +|-----------------------------------------------|--------------------------------------------------------------------------------------------------------| +| After sidecar written, before plan persisted | No REQUESTED instant on timeline. Cleaner starts fresh. Orphaned sidecar cleaned at next startup. | +| After plan persisted, before execution | REQUESTED instant found; plan re-read, sidecar re-read, executed. | +| During execution (partial deletes) | INFLIGHT instant re-executed. Sidecar re-read. Already-deleted files return FileNotFoundException -> success. | +| After execution, before COMPLETED | INFLIGHT re-executed. All deletes are no-ops. Metadata written, instant transitions to COMPLETED. | +| Plan rolled back | Rollback reads `extraMetadata["blobDeletesPath"]` and deletes the sidecar. | --- @@ -587,7 +654,7 @@ cleaning: Per-FG blob ref sets: ~100MB peak (500K records * 100 bytes/ref for expired + retained). FGs are processed sequentially within each partition batch -- per-FG sets are computed and discarded, not -accumulated. Only the output lists (`hudi_blob_deletes`, `external_candidates`) grow, containing +accumulated. Only the output list (`all_local_orphans`) grows, containing only orphaned refs (much smaller). Peak heap for Stage 1: ~100MB * `cleanerParallelism` = 400MB-1.6GB. Stage 2 output lists (`candidate_paths`, `all_record_keys`) can be large -- each cleaned FG may @@ -611,8 +678,8 @@ contribute row-level blob refs as candidates. These are backed by engine-context ## Rollout / Adoption Plan **Foundation (shared prerequisite).** `CleanPlanner` refactoring (policy methods return -`FileGroupCleanResult`), schema changes (nullable `blobFilesToDelete` field), and the -`hasBlobColumns` zero-cost gate. +`FileGroupCleanResult`), sidecar Parquet write/read infrastructure, and the `hasBlobColumns` +zero-cost gate. **Stage 1 (per-FG cleanup).** Set-difference logic. Produces local orphan candidates for Stage 2. @@ -620,8 +687,12 @@ contribute row-level blob refs as candidates. These are backed by engine-context cross-FG verification prevents premature deletion of shared blobs. Requires MDT + record index + secondary index on `reference.external_path`. Includes fallback table scan with circuit breaker. -**Writer-side conflict check.** `preCommit()` conflict check for concurrency safety. Closes the -writer-cleaner race window. Independent of the two stages. +**Sidecar lifecycle.** Write sidecar at planning time, read at execution time, clean up at archival. +Orphan cleanup at cleaner startup. Rollback cleanup via `extraMetadata`. + +**Writer-side conflict check.** `preCommit()` conflict check reads sidecar for concurrency safety. +Requires extending `ConcurrentOperation` / `TransactionUtils` to include clean actions when blob +columns exist. ### Backward Compatibility @@ -642,24 +713,32 @@ writer-cleaner race window. Independent of the two stages. - **Stage 2 index lookup:** Verify short-circuit behavior (stop after first live reference), empty results (globally orphaned), and batched prefix scans. - **Stage 2 fallback:** Verify table scan correctness and circuit breaker activation. -- **Writer-side conflict check:** Verify detection of conflicts with COMPLETED, INFLIGHT, and - REQUESTED clean actions. +- **Sidecar Parquet write/read:** Verify round-trip of blob delete list through sidecar file. +- **Writer-side conflict check:** Verify detection of conflicts via sidecar reads for COMPLETED, + INFLIGHT, and REQUESTED clean actions. ### Integration Tests - End-to-end clean cycle with external blob table and MDT secondary index (COW and MOR). - Clean cycle with replaced file groups (post-clustering, post-insert_overwrite). +- Sidecar lifecycle: verify sidecar created at planning, read at execution, deleted at archival. ### Concurrency Tests -- Writer-cleaner race scenarios A-D (from concurrency analysis) with external blobs. +- Writer-cleaner race scenarios A-D (from concurrency analysis) with external blobs and sidecar. - Concurrent clean + compaction with blob tables. +### Sidecar Lifecycle Tests + +- Orphan cleanup: sidecar exists without plan → cleaned at next startup. +- Rollback cleanup: plan rolled back → sidecar deleted. +- Archival cleanup: clean instant archived → sidecar deleted. +- Missing sidecar at execution: graceful handling (skip blob cleanup, log warning). + ### Backward Compatibility -- Non-blob table clean cycle produces identical behavior (no `blobFilesToDelete`, no - `blobCleanStats`). -- Clean plan deserialization with and without blob fields (nullable field compatibility). +- Non-blob table clean cycle produces identical behavior (no sidecar, no `blobCleanStats`). +- Clean plan deserialization with and without `extraMetadata["blobDeletesPath"]`. ---