Skip to content

EH0C/redpanda-clickhouse-cdc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Redpanda CDC → ClickHouse

PostgreSQL → Debezium → Redpanda → ClickHouse change-data-capture pipeline.

Stack

Service Container Port
PostgreSQL postgres 5432
Redpanda redpanda 19092 (external), 9092 (internal)
Debezium Connect debezium-connect 8083
ClickHouse clickhouse 8123 (HTTP), 9000 (native)

Step 1 — Start the stack

docker compose up -d

Wait until Debezium Connect is healthy before proceeding:

docker compose ps
# debezium-connect should show (healthy)

Step 2 — Create the table in PostgreSQL

docker exec -it postgres psql -U user -d postgres
CREATE TABLE public.report (
    id               TEXT PRIMARY KEY,
    username         TEXT NOT NULL,
    product          TEXT NOT NULL,
    response_message TEXT NOT NULL DEFAULT ''
);

Step 3 — Insert sample data

INSERT INTO public.report (id, username, product, response_message) VALUES
('1', 'alice', 'product-a', 'success'),
('2', 'bob',   'product-b', 'success');

Exit psql: \q


Step 4 — Register the CDC connector

Run this from the project directory (where postgres-cdc-connector.json lives):

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-cdc-connector.json

PowerShell users: use curl.exe and backtick (`) for line continuation:

curl.exe -X POST http://localhost:8083/connectors `
  -H "Content-Type: application/json" `
  -d '@postgres-cdc-connector.json'

Verify it is running:

curl -s http://localhost:8083/connectors/postgres-cdc-connector/status | python -m json.tool

Expected:

{
  "connector": { "state": "RUNNING" },
  "tasks": [{ "id": 0, "state": "RUNNING" }]
}

Debezium performs an initial snapshot of the existing rows and publishes them to the Redpanda topic pgserver.public.report. Wait ~5 seconds for the snapshot to complete before moving on.


Step 5 — Set up ClickHouse Kafka engine

Open the ClickHouse client:

docker exec -it clickhouse clickhouse-client

Then run the three statements below in order.

1. Kafka queue table (reads raw messages from Redpanda — do not query this directly):

CREATE TABLE report_queue (
    id               String,
    username         String,
    product          String,
    response_message String,
    __op             String
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'redpanda:9092',
    kafka_topic_list = 'pgserver.public.report',
    kafka_group_name = 'clickhouse-consumer-group',
    kafka_format = 'JSONEachRow',
    kafka_skip_broken_messages = 1;

2. Storage table (where deduplicated data lives):

CREATE TABLE report (
    id               String,
    username         String,
    product          String,
    response_message String,
    op               String,
    _ingested_at     DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(_ingested_at)
ORDER BY id;

3. Materialized view (pipes rows from the queue into the storage table automatically):

CREATE MATERIALIZED VIEW report_mv
TO report AS
SELECT
    id,
    username,
    product,
    response_message,
    __op AS op
FROM report_queue;

Once the MV is created, ClickHouse starts consuming immediately. Wait a few seconds for the initial snapshot rows to land.


Step 6 — Query ClickHouse

Deduplicated query using FINAL

SELECT * FROM report FINAL;

You should see the 2 rows inserted in Step 3 with op = 'r' (initial snapshot read).

ReplacingMergeTree deduplicates rows with the same id, keeping the row with the highest _ingested_at. Use FINAL to get deduplicated results immediately at query time rather than waiting for background merges.

Force immediate deduplication

If you want duplicates cleaned up right away without waiting for the background merge:

OPTIMIZE TABLE report FINAL;

Step 7 — Verify CDC updates propagate to ClickHouse

Update a row in PostgreSQL

docker exec -it postgres psql -U user -d postgres
UPDATE public.report
SET response_message = 'updated response'
WHERE id = '1';

Debezium captures the change and publishes it to Redpanda as an op=u event. ClickHouse inserts the updated row with a new _ingested_at timestamp.

Confirm the update in ClickHouse

SELECT * FROM report FINAL WHERE id = '1';

You should see response_message = 'updated response' and op = 'u'.

Without FINAL you may temporarily see both the old and new row until the background merge runs.


Troubleshooting — Missing rows in ClickHouse after a pipeline reset

ClickHouse only receives rows through CDC events. It has no direct connection to Postgres and cannot backfill on its own. If you ever truncate the ClickHouse report table, reset the connector, or delete the Redpanda topic, any rows that don't get a new CDC event will be missing.

Symptom: Postgres has N rows but ClickHouse has fewer.

Fix: Touch every row in Postgres to force Debezium to re-publish them:

docker exec -it postgres psql -U user -d postgres
UPDATE public.report SET response_message = response_message;

This no-op update causes Debezium to emit an op=u event for every row, which ClickHouse then consumes and stores.

About

Real-time CDC pipeline using PostgreSQL, Redpanda (Kafka), Debezium, and ClickHouse — fully containerized with Docker Compose

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors