feat(plugin-iceberg): Bound MV refresh#27774
Conversation
Reviewer's GuideImplements bounded incremental refresh for Iceberg materialized views using a new max_snapshots_per_refresh MV property plus matching config/session default, threads the bound through metadata/status computation and the planner so that refresh scans are capped per base table using Iceberg V3 row lineage, and adds extensive tests around bounded behavior, lineage, compaction, rollback, partitioned MVs, and configuration resolution. Sequence diagram for bounded Iceberg MV refresh planningsequenceDiagram
actor User
participant Session
participant IcebergMetadata as IcebergAbstractMetadata
participant Status as MaterializedViewStatus
participant Planner as IncrementalRefreshRule
participant Rewriter as IncrementalRefreshPredicateRewriter
participant Scan as TableScanNode
User->>Session: REFRESH MATERIALIZED VIEW
Session->>IcebergMetadata: getMaterializedViewStatus(session, mvName)
IcebergMetadata->>IcebergMetadata: resolveMaxSnapshotsPerRefresh(session, viewProperties)
IcebergMetadata->>IcebergMetadata: chooseTargetSnapshot(baseTable, watermark, maxSnapshots)
IcebergMetadata-->>Session: MaterializedViewStatus(partitionDisjuncts, incrementalRefreshPredicate)
Session->>Planner: apply(RefreshMaterializedViewNode)
Planner->>Status: getPartitionsFromBaseTables()
Planner->>Planner: buildDeltaPlanForRefresh(...)
Planner->>Planner: applyIncrementalRefreshPredicates(plan, incrementalRefreshPredicates, ...)
Planner->>Rewriter: SimplePlanRewriter.rewriteWith(perBasePredicates, plan)
Rewriter->>Scan: visitTableScan(TableScanNode)
Rewriter-->>Planner: FilterNode(TableScanNode)
Planner-->>Session: plan with bounded base table scans
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Advance each base's watermark to the N-th-oldest ancestor in (watermark, HEAD] and push a _last_updated_sequence_number bound into the refresh scan via existing V3 row-lineage pushdown.
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
IcebergAbstractMetadata.getMaterializedViewStatus, the rollback/branch guard usesisAncestorOf(baseIcebergTable, currentSnapshotId, recordedSnapshotId)butSnapshotUtil.isAncestorOfexpects(ancestor, descendant); sincerecordedSnapshotIdis older, this arg order appears reversed and will incorrectly forceNOT_MATERIALIZEDwhenever the base advances, effectively disabling incremental refresh after the first change. chooseTargetSnapshotalready contains an ancestry guard (isAncestorOf(baseTable, head, watermark)), butgetMaterializedViewStatusadds a separate ancestry check with different parameter ordering and semantics; consider centralizing this logic inchooseTargetSnapshot(or a shared helper) to avoid divergence and subtle bugs in rollback/branch handling.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `IcebergAbstractMetadata.getMaterializedViewStatus`, the rollback/branch guard uses `isAncestorOf(baseIcebergTable, currentSnapshotId, recordedSnapshotId)` but `SnapshotUtil.isAncestorOf` expects `(ancestor, descendant)`; since `recordedSnapshotId` is older, this arg order appears reversed and will incorrectly force `NOT_MATERIALIZED` whenever the base advances, effectively disabling incremental refresh after the first change.
- `chooseTargetSnapshot` already contains an ancestry guard (`isAncestorOf(baseTable, head, watermark)`), but `getMaterializedViewStatus` adds a separate ancestry check with different parameter ordering and semantics; consider centralizing this logic in `chooseTargetSnapshot` (or a shared helper) to avoid divergence and subtle bugs in rollback/branch handling.
## Individual Comments
### Comment 1
<location path="presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergRestMaterializedViews.java" line_range="596-604" />
<code_context>
+ }
+ }
+
+ private static void assertAdvanceLeq(long oldWatermark, long newWatermark, long head, int bound, String baseTable)
+ {
+ if (oldWatermark == newWatermark) {
+ fail("expected watermark to advance from " + oldWatermark + " on base " + baseTable);
+ }
+ if (newWatermark == head) {
+ return;
+ }
+ assertNotEquals(newWatermark, oldWatermark);
+ }
}
</code_context>
<issue_to_address>
**issue (testing):** assertAdvanceLeq helper does not actually enforce the "<= bound" invariant
The helper’s behavior doesn’t match its name or call-site intent: it only checks that the watermark advanced (and possibly hit `head`), but never that it advanced by *no more than* `bound` snapshots. A change that advances by 10 when `bound=2` would still pass. Please tighten this by explicitly asserting the step count is `<= bound` (e.g., via `ancestorIdsBetween(head, oldWatermark, ...)` and checking `newWatermark` is within `bound` steps), or by exposing a small test hook that reveals the chosen target snapshot so the boundedness can be asserted directly.
</issue_to_address>
### Comment 2
<location path="presto-docs/src/main/sphinx/connector/iceberg.rst" line_range="551" />
<code_context>
- is hashed to a particular node when determining the which worker to
- assign a split to. Splits which read data from the same file within
- the same chunk will hash to the same node. A smaller chunk size will
- result in a higher probability splits being distributed evenly across
- the cluster, but reduce locality.
- See :ref:`develop/connectors:Node Selection Strategy`.
</code_context>
<issue_to_address>
**issue (typo):** Insert "of" after "probability" for correct phrasing.
```suggestion
result in a higher probability of splits being distributed evenly across
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| private static void assertAdvanceLeq(long oldWatermark, long newWatermark, long head, int bound, String baseTable) | ||
| { | ||
| if (oldWatermark == newWatermark) { | ||
| fail("expected watermark to advance from " + oldWatermark + " on base " + baseTable); | ||
| } | ||
| if (newWatermark == head) { | ||
| return; | ||
| } | ||
| assertNotEquals(newWatermark, oldWatermark); |
There was a problem hiding this comment.
issue (testing): assertAdvanceLeq helper does not actually enforce the "<= bound" invariant
The helper’s behavior doesn’t match its name or call-site intent: it only checks that the watermark advanced (and possibly hit head), but never that it advanced by no more than bound snapshots. A change that advances by 10 when bound=2 would still pass. Please tighten this by explicitly asserting the step count is <= bound (e.g., via ancestorIdsBetween(head, oldWatermark, ...) and checking newWatermark is within bound steps), or by exposing a small test hook that reveals the chosen target snapshot so the boundedness can be asserted directly.
| is hashed to a particular node when determining the which worker to | ||
| assign a split to. Splits which read data from the same file within | ||
| the same chunk will hash to the same node. A smaller chunk size will | ||
| result in a higher probability splits being distributed evenly across |
There was a problem hiding this comment.
issue (typo): Insert "of" after "probability" for correct phrasing.
| result in a higher probability splits being distributed evenly across | |
| result in a higher probability of splits being distributed evenly across |
|
@steveburnett I refactored the docs a bit and added more user facing docs, let me know what you think! |
Description
Adds a bounded incremental-refresh mode for Iceberg materialized views. A new
max_snapshots_per_refreshMV property (with a matching session/config default) caps how far each base table can advance in a singleREFRESH MATERIALIZED VIEW, using the_last_updated_sequence_numberrow-lineage column to limit the refresh scan. Remaining backlog is picked up by subsequent refreshes.Motivation and Context
On busy tables an MV can fall far behind, and a single
REFRESHmay be too large to complete in one statement. This lets operators chunk catch-up into bounded steps. V2 tables, which lack row lineage, fall back to unbounded refresh.Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
Summary by Sourcery
Add bounded incremental refresh support for Iceberg materialized views, including configuration, planning, and metadata handling to cap the number of snapshots consumed per refresh and integrate with row lineage.
New Features:
Enhancements:
Tests: