Skip to content

Atomic Postgres-to-Iceberg Snapshots #4

@laskoviymishka

Description

@laskoviymishka

Problem

When snapshotting a Postgres database to Apache Iceberg (via tools like Transferia), each table is committed to the Iceberg catalog independently. If a failure occurs mid-way through a multi-table snapshot, the result is an inconsistent point-in-time view: some tables reflect the new snapshot, others reflect the previous state or are missing entirely.

This breaks the fundamental contract of a database snapshot — that all tables represent the same logical point in time.

Example: 5-table Postgres snapshot

Table     Commit Result
────────  ──────────────
users     ✓ committed (snapshot T₁)
orders    ✓ committed (snapshot T₁)
payments  ✗ failed (409 conflict)
products  — never attempted
inventory — never attempted

A consumer querying the Iceberg lakehouse now sees orders referencing users that exist, but payments for those orders are stale or missing. Foreign key relationships, aggregates, and joins produce silently wrong results.

Scope: Snapshot Only

This design addresses snapshot (full-load) transfers — bulk copy of all tables from Postgres to Iceberg at a point in time. Replication (CDC/WAL-based streaming with INSERT/UPDATE/DELETE) is a separate problem that requires full DML support in iceberg-go (row-level deletes, merge-on-read, equality deletes) and is explicitly out of scope here.

Why This Matters

  • Analytics correctness: Cross-table queries (joins, aggregates) require consistent snapshots. Partial commits violate this.
  • Regulatory/audit: Financial and compliance workloads require provable point-in-time consistency across related tables.
  • Operational confidence: Without atomicity, every failure requires manual investigation to determine which tables are stale.
  • Snapshot freshness: Failed partial snapshots cannot be safely retried without first understanding which tables were already committed — adding complexity and risk to recovery procedures.

How Snapshots Work Today (Transferia)

In transferia/iceberg, the snapshot sink processes tables sequentially:

  1. For each table, sharded workers write Parquet files to S3 in parallel
  2. Each worker publishes its file list to a shared Coordinator
  3. The lead worker receives a DoneShardedTableLoad control event per table
  4. On that event, it collects all files from all workers and commits a single Iceberg transaction for that one table
  5. The framework moves on to the next table
Table A: write files → DoneShardedTableLoad → tx.Commit(A) ✓
Table B: write files → DoneShardedTableLoad → tx.Commit(B) ✓
Table C: write files → DoneShardedTableLoad → tx.Commit(C) ✗ FAIL
         ─── inconsistent state ───

There is no mechanism to batch all table commits into a single atomic operation.

Solution: Multi-Table Atomic Commit

The Iceberg REST Catalog spec already defines an endpoint for this:

POST /v1/transactions/commit

{
  "table-changes": [
    { "identifier": {"namespace": ["public"], "name": "users"},
      "requirements": [...], "updates": [...] },
    { "identifier": {"namespace": ["public"], "name": "orders"},
      "requirements": [...], "updates": [...] },
    { "identifier": {"namespace": ["public"], "name": "payments"},
      "requirements": [...], "updates": [...] }
  ]
}

Response: 204 No Content (all-or-nothing)

All table changes either succeed together or fail together. No partial state.

The Java implementation (RESTSessionCatalog.commitTransaction()) has supported this since Iceberg 1.4. No other client library implements it — not Python (pyiceberg), not Go (iceberg-go), not Rust (iceberg-rust).

End-State: Snapshot Flow with Atomic Commit

Postgres ───snapshot───▶ Transferia ───Parquet───▶ S3
                              │
            ┌─────────────────┼─────────────────┐
            ▼                 ▼                 ▼
      write files A     write files B     write files C
            │                 │                 │
            ▼                 ▼                 ▼
      tx.TableCommit()  tx.TableCommit()  tx.TableCommit()
            │                 │                 │
            └────────┬────────┘                 │
                     └────────────┬─────────────┘
                                  ▼
                    ┌───────────────────────┐
                    │  POST /transactions/  │
                    │  commit               │
                    │  (atomic, all tables)  │
                    └───────────────────────┘

All tables appear atomically at the same logical point in time. Consumers always see a consistent cross-table view.

Changes Required in Transferia

The snapshot sink needs a two-phase approach:

  1. Phase 1 (existing): For each table, write Parquet files and build a TableCommit (identifier + requirements + updates) — but do not commit.
  2. Phase 2 (new): After all tables are processed, submit all TableCommit objects in a single CommitTransaction() call.

This requires the framework to provide a "all tables done" signal (or the sink to accumulate commits across DoneShardedTableLoad events and flush on Close()). The per-table DoneShardedTableLoad event becomes a "prepare" step rather than a "commit" step.

Fallback: if the catalog does not implement TransactionalCatalog (e.g., Glue), the sink falls back to per-table commits with best-effort semantics (current behavior).

What's Missing

No TransactionalCatalog interface exists. The current Catalog interface only has CommitTable() for single-table commits. Need:

  • A TableCommit struct bundling identifier + requirements + updates
  • A TransactionalCatalog interface with CommitTransaction(ctx, []TableCommit) error
  • REST catalog implementation: POST /v1/transactions/commit
  • A way to extract pending changes from a Transaction without committing (e.g., Transaction.TableCommit())

Tracked Work

Component Scope Status
iceberg-go: TransactionalCatalog interface New interface + TableCommit type apache/iceberg-go#784
iceberg-go: REST CommitTransaction() POST /transactions/commit impl apache/iceberg-go#785
iceberg-go: Transaction.TableCommit() Extract pending changes without committing apache/iceberg-go#786
transferia/iceberg: two-phase snapshot commit Accumulate TableCommit, flush atomically After iceberg-go

Out of Scope

  • Replication / CDC: WAL-based streaming with UPDATE/DELETE requires row-level DML support in iceberg-go (equality deletes, position deletes, merge-on-read). Separate design needed.
  • REST Metrics Reporting: The REST spec's POST .../metrics endpoint is server-side only (client pushes, no read-back). Not useful for transferia's own observability. Can be pursued independently in iceberg-go for catalog-side visibility.
  • Schema evolution during snapshot: If Postgres schema changes between snapshots, the Iceberg table schema must evolve. Not addressed here.
  • Partitioned writes: Iceberg tables created by Transferia are currently unpartitioned. Partition support is orthogonal to atomic commit.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions