This is a proof of concept Change Data Capture using Debezium and PostgreSQL.
graph TD
A[OrderService] -->|JDBC| B[(PostgreSQL)]
C[Debezium] -->|JDBC| B[(PostgreSQL)]
B[(PostgreSQL)] -->|Change Events| C[Debezium]
C[Debezium] -->|Change Events| D[(Kafka)]
D[(Kafka)] -->|Change Events| E[OrderEventListener]
graph TD
A[controller-1] -->|Quorum| B[controller-2]
B[controller-2] -->|Quorum| C[controller-3]
C[controller-3] -->|Quorum| A[controller-1]
D[kafka-1] -->|Broker| A[controller-1]
D[kafka-1] -->|Broker| B[controller-2]
D[kafka-1] -->|Broker| C[controller-3]
E[kafka-2] -->|Broker| A[controller-1]
E[kafka-2] -->|Broker| B[controller-2]
E[kafka-2] -->|Broker| C[controller-3]
F[kafka-3] -->|Broker| A[controller-1]
F[kafka-3] -->|Broker| B[controller-2]
F[kafka-3] -->|Broker| C[controller-3]
G[postgres] -->|Database| H[debezium]
H[debezium] -->|Connect| D[kafka-1]
H[debezium] -->|Connect| E[kafka-2]
H[debezium] -->|Connect| F[kafka-3]
You need to have the following installed on your machine:
- Docker
- Java 21
- Gradle
- curl
- Re/Start the services using docker-compose
docker-compose down && docker compose up- Start the Order Service
./gradlew bootRun- Configure Debezium Connector
Use http/DebeziumConnector.http collection to create a Debezium connector.
Alternatively, you can use the following command to create a Debezium connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "order-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb_server",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "order_publication",
"publication.autocreate.mode": "filtered",
"schema.include.list": "orderdb",
"table.include.list": "orderdb.order",
"topic.prefix": "cdc"
}
}'Use http/OrderApplication.http collection to create/update/delete orders and observe incoming CDC events in Logs.
Alternatively, you can use the following curl commands:
curl -X POST -H "Content-Type: application/json" -d '{
"customerName": "Omer Kocaoglu",
"customerEmail": "omersw@email.com",
"customerAddress": {
"street": "1234 Elm St",
"city": "Springfield",
"state": "IL",
"zip": "62701"
}
}' http://localhost:8081/order-api/orderscurl -X POST -H "Content-Type: application/json" -d '{
"customerName": "Omer Kocaoglu",
"customerEmail": "e@mail.com",
"customerAddress": {
"street": "1234 Elm St",
"city": "Springfield",
"state": "IL",
"zip": "62701"
},
"status": "FULFILLED"
}' http://localhost:8081/order-api/orders/6d2c4d98-5aa6-4340-80d8-d737cadea2c6curl -X DELETE http://localhost:8081/order-api/orders/6d2c4d98-5aa6-4340-80d8-d737cadea2c6curl -X GET http://localhost:8083/curl -X GET http://localhost:8083/connectorscurl -X GET http://localhost:8083/connectors/order-connector/statuscurl -X GET http://localhost:8083/connectors/order-connector/configcurl -X GET http://localhost:8083/connectors/order-connector/topicsSee Debezium Monitoring for more details.
Kafka cluster setup is taken from official Kafka Docker image repository. See Multi Node Cluster/Isolated section for more details.