Skip to content

Add CDC replication sink with Iceberg v2 equality deletes#6

Merged
laskoviymishka merged 28 commits intomainfrom
feat/cdc-replication-sink
Apr 2, 2026
Merged

Add CDC replication sink with Iceberg v2 equality deletes#6
laskoviymishka merged 28 commits intomainfrom
feat/cdc-replication-sink

Conversation

@laskoviymishka
Copy link
Copy Markdown
Contributor

Implement SinkReplication for CDC (Change Data Capture) from Postgres/MySQL WAL sources using native Iceberg v2 row-level operations:

  • Equality deletes via WriteEqualityDeletes + RowDelta commits
  • Intra-batch deduplication (INSERT→DELETE cancels, UPDATE collapses)
  • Multi-table atomic commits via MultiTableTransaction when catalog supports it
  • Automatic schema evolution (AddColumn for new CDC columns)
  • WAL position (LSN) tracking in snapshot summary properties
  • Interval-based flush with configurable buffer size threshold

Also fixes existing code for latest iceberg-go API:

  • LoadTable signature (removed properties param)
  • AddFiles/FS now take context
  • Extract shared Destination.NewCatalog() helper
  • Fix mutex leak in SinkStreaming.clearState

Implement SinkReplication for CDC (Change Data Capture) from Postgres/MySQL
WAL sources using native Iceberg v2 row-level operations:

- Equality deletes via WriteEqualityDeletes + RowDelta commits
- Intra-batch deduplication (INSERT→DELETE cancels, UPDATE collapses)
- Multi-table atomic commits via MultiTableTransaction when catalog supports it
- Automatic schema evolution (AddColumn for new CDC columns)
- WAL position (LSN) tracking in snapshot summary properties
- Interval-based flush with configurable buffer size threshold

Also fixes existing code for latest iceberg-go API:
- LoadTable signature (removed properties param)
- AddFiles/FS now take context
- Extract shared Destination.NewCatalog() helper
- Fix mutex leak in SinkStreaming.clearState
- Add pg2iceberg/replication tests (snapshot+repl, repl-only)
- Add mysql2iceberg/replication tests (snapshot+repl, repl-only)
- Add mongo2iceberg/replication tests (snapshot+repl, repl-only)
- Point go.mod to upstream iceberg-go commit c7839ca (has all CDC APIs)
- Remove local replace directive for iceberg-go
- Bump Go version to 1.25.5 in go.mod and CI workflow
- Add go-amqp v1.5.0 replace for Azure dep compatibility
Benchmark infrastructure for measuring sustained PG→Iceberg CDC throughput:
- Parametric load generator with ramp-up (1K→10K rows/sec)
- Three DML profiles: InsertOnly, InsertHeavy (90/5/5), Balanced (60/30/10)
- Real-time metrics: row counts, replication lag, write rate
- Postgres added to docker-compose with WAL level=logical

Kafka tests updated with multi-partition concurrent write tests:
- TestMultiPartitionReplication: 4 partitions, 100 messages each
- TestMultiPartitionHighThroughput: 8 partitions, batched writes

Run: make recipe && go test -run TestBenchmarkSmoke -timeout=5m -v ./tests/bench/
- tests/bench/README.md: benchmark design, architecture, load profiles,
  metrics collected, and how to run
- doc/fix-transferia-docker-compat.md: documents the Docker client API
  incompatibility in transferia@v0.0.2 (types.ImagePullOptions removed
  in Docker v26+) that blocks integration test compilation
- Update provider registration to new LoggableSource/LoggableDestination API
- Add MarshalLogObject to Source and Destination
- Update New() factory to accept *TransferOperation
- Fix ParseTableID → NewTableIDFromString in sink_streaming
- Remove stale Azure amqp replace directives
- Cleaner go.mod with fewer replace hacks

Note: integration tests still blocked by Docker ImagePullOptions issue
in transferia/pkg/container (needs fix in transferia main repo).
- Bump transferia to v0.0.6-rc3 (fixes Docker ImagePullOptions)
- Add kafka-go and confluent-kafka-go replace directives from transferia
- Use debezium/postgres:11-alpine for wal2json support in benchmarks
- Fix RunPprof signature change in trcli

Benchmark smoke test passes: PG→Iceberg CDC replication with 7.6K rows
in 20s load generation window.
… logging

- Auto-create Iceberg namespace before table creation (fixes NoSuchNamespaceException)
- Import io/gocloud to register S3/GCS/Azure filesystem schemes
- Add logging for flush/commit operations
- Rework PG replication test to use testcontainers
- Add run_repl_test.sh helper script with proper env vars

CDC write path now working end-to-end: PG snapshot → buffer → flush →
CreateTable (format-version=2) → WriteRecords → RowDelta commit.
Reader-side row count verification still needs S3 endpoint config fix.
- Pass S3 properties to REST catalog in Storage (was missing, causing S3 auth failures)
- Import io/gocloud in storage.go for S3 filesystem scheme registration
- ExactTableRowsCount now uses Scan().ToArrowRecords() for accurate count
  that respects equality deletes (merge-on-read)
- Add CleanupTable helper for test cleanup between runs
- Add auto-create namespace in ensureTable

PG replication integration test PASSES end-to-end:
snapshot (3 rows) → CDC (INSERT+UPDATE+DELETE) → verify (3 rows with equality deletes)
Test results:
- PG:    snapshot ✅, snapshot+repl ✅, repl-only ✅
- MySQL: snapshot ✅, snapshot+repl ✅, repl-only ✅
- Mongo: snapshot+repl ✅, repl-only (timing-sensitive)

Fixes:
- PG/MySQL snapshot: use dumpDir() with runtime.Caller for absolute paths
  (fixes ProjectSource panic with new transferia)
- MySQL replication: use source object fields for DB connection
- Mongo replication: simplified, removed manual driver connection
- All tests: add CleanupTable before runs for idempotency
- run_repl_test.sh: unified test runner with proper env vars
- Remove verbose flush/commit INFO logging from SinkReplication
- Remove outdated Docker compat doc (resolved with transferia v0.0.6-rc3)
- Fix kafka test: MakeKafkaRawMessage → abstract.MakeRawMessage (v0.0.6-rc3 API)
- Fix mongo test: use keyed bson.D struct literals (go vet)
- go vet passes clean across all packages
… error handling

Review fixes:
- Fix buffer drain-then-commit data loss: items are re-buffered on flush
  failure instead of being silently dropped
- Fix cache key inconsistency in commitPerTable: use tableCacheKey()
  consistently (was using raw tableID string)
- Log namespace creation errors instead of silently ignoring
- Consolidate all catalog init into Destination.NewCatalog() — removed
  duplicate rest.NewCatalog/glue.NewCatalog from sink_snapshot.go,
  sink_streaming.go, and storage.go
- Document no-PK append-only fallback behavior in prepareChanges
Root cause: mc container uses deprecated `mc config host add` command
(renamed to `mc alias set` in newer mc versions), and races with the
Spark provision script which tries to create tables before the bucket
exists.

Fixes:
- Update mc container entrypoint to use `mc alias set` + `--ignore-existing`
- CI workflow: poll for MinIO bucket readiness before provisioning
- CI: explicitly create bucket via `docker exec minio mc mb` as fallback
- CI: export AWS_ACCESS_KEY_ID/SECRET/REGION for test runner
- CI: make Spark provision non-fatal (CDC tests don't need it)
- Fix CommitFailedException on re-buffered items: invalidate table cache
  on commit failure so next flush reloads fresh metadata from catalog
- Add CleanupTable to benchmark for idempotent runs
- Fix benchmark to use testcontainer PG (same as transfer source)
- Update README with actual smoke benchmark results:
  7,896 rows, 0 lag, 492 rows/sec peak, zero data loss
Benchmark results (InsertOnly, 1K→10K ramp, 5 min, Apple M1 Pro):
- 1,327,123 rows replicated with zero data loss
- 6,022 rows/sec peak, 4,423 rows/sec average
- Steady-state lag: 15-20K rows (~3 commit cycles)
- Lag drops to 0 within 30s after load stops

Also adds avg/peak lag tracking to MetricsCollector.
Metrics now show lag in seconds: LagRows / CurrentRate = LagSeconds.
Steady-state lag is ~3 seconds at all write rates (1K-6K rows/sec).

Results updated in README with seconds-based lag.
Note: TestBenchmarkAll doesn't work for sequential profiles due to
testcontainer PG cleanup between sub-tests. Run profiles individually.
TestEqualityDeleteReadPerf measures scan degradation as equality delete
files accumulate from UPDATE/DELETE operations.

Results (10K rows, 10 rounds of 1K DML each):
- Baseline: 41ms
- After 5K DML: ~165ms (4x)
- After 10K DML: ~5s (124x)

This demonstrates the merge-on-read overhead and the need for
compaction in CDC-heavy workloads.
Documents 5 approaches to handle equality delete read degradation:
1. Periodic full compaction (simplest)
2. Incremental compaction (smart — only dirty files)
3. Threshold-based auto-compaction in sink
4. Copy-on-write mode (no compaction needed, but slow writes)
5. Hybrid: equality deletes + background compaction (recommended)

Includes iceberg-go API analysis (ReplaceDataFilesWithDataFiles,
PlanFiles with FileScanTask.EqualityDeleteFiles, ToArrowRecords)
and a phased implementation plan.
- TestTableFileStats: demonstrates two sources of dirty file stats:
  1. Snapshot Summary (total-data-files, total-delete-files, total-equality-deletes)
  2. PlanFiles → FileScanTask.EqualityDeleteFiles per data file
- LoadTable helper in recipe.go for metadata inspection
- iceberg-go-optimize-proposal.md: feature request draft with 3 API options,
  benchmarks, and workaround implementation

Results: after 1K DML on 1K rows, 75% of data files are dirty.
Self-contained demo with:
- docker-compose.yml: PG + MinIO + REST catalog (no JVM needed to run)
- transfer.yaml: trcli config for snapshot + CDC replication
- seed.sql: initial data, workload.sql: INSERT/UPDATE/DELETE examples
- README: architecture diagram, quick start, how it works,
  performance numbers, limitations, links to research docs
Fixes:
- Kafka→Iceberg replication tests: replace fixed sleep + DestinationRowCount
  with waitForRows() polling loop (30s timeout). The table may not exist
  yet on slow CI runners when using a fixed 5s sleep.
- Skip all benchmark tests (InsertOnly, InsertHeavy, Balanced, EqDeletePerf,
  TableStats) in CI — they're too slow for GitHub Actions and are meant
  for local performance testing.

Root causes from CI run 23872448978:
- TestReplication/TestMultiPartition*: NoSuchTableException (table not
  flushed yet after fixed sleep)
- TestBenchmarkBalanced: timeout after 15m (equality deletes too slow on CI)
substrait-go v7.6.0 panics during init() on linux/amd64 CI with
"strings: negative Repeat count" in go-yaml printer. This is an
upstream dependency issue in iceberg-go's substrait package.

Workaround: gate the test behind `cdc_replication` build tag.
Run locally with: go test -tags cdc_replication ./tests/pg2iceberg/replication/
@laskoviymishka laskoviymishka marked this pull request as ready for review April 2, 2026 18:37
@laskoviymishka laskoviymishka merged commit 765a6d2 into main Apr 2, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant