Skip to content

Vaibtan/CDC-FastAPI

Repository files navigation

WalStream CDC

Enterprise Change Data Capture Platform with FastAPI, PostgreSQL, Redis, Kafka, and Next.js.

Overview

WalStream is a production-ready CDC (Change Data Capture) system that:

  • Captures PostgreSQL WAL changes in real-time using wal2json format-version 2
  • Dual-writes to Redis Streams (low-latency buffer) and Kafka (durable archive)
  • Provides speed-controlled replay with idempotency guarantees
  • Offers a REST API control plane with JWT user auth and API-key service auth
  • Includes real-time WebSocket event streaming
  • Uses async-native libraries throughout (aiokafka, redis.asyncio, grpc.aio)

Architecture

WalStream CDC System Design

Source PostgreSQL (:5432, WAL + wal2json v2)
  -> Ingestor
      -> Redis Stream (live buffer, MAXLEN trimmed)
      -> Kafka Topic (durable archive)
      -> Prometheus metrics (:9090)

Worker Service (`app.workers.manager`)
  -> Replay Router (Redis < 1h / Kafka >= 1h)
  -> ReplayEvent RPC calls

gRPC Replayer (:50051, :9092 metrics)
  -> Target Database

FastAPI Control API (:8000, :9091 metrics)
  -> REST API (/api/v1/*)
  -> WebSocket Events (/api/v1/events/ws)
  -> AuthN/AuthZ (JWT + API keys, RBAC, audit, rate limiting)

Control PostgreSQL (:5433; jobs, users, audit logs, API keys, dedup)
  <-> FastAPI Control API
  <-> Worker Service
  <-> gRPC Replayer (dedup state)

Frontend Dashboard (:3000)
  -> Control API (REST + WebSocket)
  -> Metrics proxy UI (/api/metrics)

Prometheus (:9099)
  -> Scrapes ingestor/control/replayer metrics
  -> Alertmanager (:9093)
  -> Grafana (:3001)

Architectural Decisions

  • Centralized dedup in Replayer: Workers do not maintain local dedup state; idempotency is enforced in replayer/server.py.
  • At-least-once replay semantics: Replayer uses exists -> apply -> mark_processed so failed applies are retried instead of being pre-marked as duplicates.
  • Fail-fast job progression on unreplayable events: Workers retry each event up to MAX_EVENT_RETRIES times after the initial attempt; if still failing, the job is marked FAILED without advancing checkpoint beyond the failed event.
  • Dual replay source strategy: Replay router serves recent/small windows from Redis and historical/large windows from Kafka.
  • Source-pinned resume behavior: Resumed jobs reuse their original replay_source to avoid cross-source checkpoint mismatches (for example Redis stream IDs interpreted as Kafka offsets).
  • Per-partition Kafka checkpoints: Kafka progress is stored in JSON (checkpoint) keyed by partition, while last_processed_id is retained as a fallback string checkpoint.
  • Safe Kafka partition seek semantics: On resume, uncheckpointed partitions seek to start_ms; if no offset exists at/after start_ms, they seek to partition end to avoid replaying out-of-window historical data.
  • Cross-partition Kafka end-boundary safety: Replay does not stop globally on the first out-of-range message; partitions are completed independently to avoid dropping in-range records from other partitions.
  • Lease-based worker ownership: Job execution uses DB-backed leases plus renewal to prevent concurrent workers from processing the same job.
  • Lease expiry recovery for orphaned jobs: Workers can reclaim expired-lease RUNNING jobs after crashes, not just QUEUED jobs.
  • Read/write session separation by path: Auth and health/readiness/event-stream read paths use read sessions; API key last_used_at updates are handled with explicit write context.
  • Control-plane operational gauges: walstream_jobs_active and walstream_job_lease_expired are exported for replay stall and lease-integrity alerting.
  • Async I/O across services: aiokafka, redis.asyncio, grpc.aio, and async SQLAlchemy are used to keep ingestion and replay non-blocking.
  • Hybrid auth model: User-facing clients use JWT; service clients can authenticate with X-API-Key; health probes remain public.

Frontend Dashboard

WalStream includes a modern web dashboard built with Next.js 14:

Frontend (Next.js 14 + TypeScript)
       |
       +---> /login          - JWT Authentication
       +---> /                - Dashboard Overview
       +---> /jobs            - Replay Job Management
       +---> /jobs/new        - Create New Job
       +---> /jobs/[id]       - Job Details & Actions
       +---> /events          - Real-time Event Stream
       +---> /metrics         - Prometheus Metrics Dashboard

Frontend Tech Stack

  • Framework: Next.js 14 with App Router
  • Language: TypeScript
  • UI Components: shadcn/ui + Tailwind CSS
  • State Management: Zustand (auth, events, ui)
  • Server State: TanStack Query (React Query)
  • Charts: Recharts
  • Virtualization: @tanstack/react-virtual

Features

  • Real-time Change Capture: PostgreSQL logical replication via WAL streaming
  • Dual Storage Pipeline: Redis Streams for live tail, Kafka for historical replay
  • At-Least-Once Delivery: With centralized deduplication at the Replayer
  • Speed-Controlled Replay: Configurable 0.1x to 100x replay speed
  • Durable Job Execution: Lease-based workers with checkpoint/resume
  • REST API: FastAPI-based control plane with OpenAPI docs
  • Security Controls: RBAC, API keys, audit logging, and Redis-backed rate limiting
  • WebSocket Streaming: Real-time event monitoring
  • Prometheus Metrics: Full observability for all components
  • Async-Native: aiokafka, redis.asyncio, grpc.aio for non-blocking I/O
  • Docker Ready: Complete docker-compose setup
  • Modern Frontend: Next.js 14 dashboard with real-time updates

Quick Start

Prerequisites

  • Docker & Docker Compose
  • Python 3.11+
  • uv (recommended) or pip
  • Node.js 18+ (for frontend development)

Database Architecture

WalStream uses two PostgreSQL databases:

Database Port Purpose
postgres (walstreamdb) 5432 Source database with WAL for CDC capture
postgres-control 5433 Control plane database (jobs, users, dedup state)

Option A: Docker Setup (Recommended)

1. Clone & Configure

git clone <repo-url>
cd CDC-FastAPI
cp .env.example .env
cp frontend/.env.example frontend/.env.local

2. Start Infrastructure

# Start databases, Redis, and Kafka
docker-compose up -d postgres postgres-control redis zookeeper kafka

# Wait until all infrastructure services are healthy
docker-compose ps

3. Run Database Migrations (Required Before First Start)

The control plane verifies the database schema on startup and will refuse to start if migrations have not been applied. Run Alembic from inside the control image (which already includes the migration files):

# Build the control image once
docker-compose build control

# Apply migrations to the control-plane database
docker-compose run --rm control alembic upgrade head

# (Optional) Verify the current revision
docker-compose run --rm control alembic current

Or, if you have Python + dependencies installed locally:

make migrate          # runs: cd control && alembic upgrade head
make migrate-check    # shows current revision

Why migration-first? The control plane calls check_db_revision() at startup and compares the database's Alembic head against the expected revision compiled into the code. A mismatch raises RuntimeError and exits immediately. This prevents the application from running against a stale schema.

4. Start Application Services

docker-compose up -d ingestor control replayer worker frontend

The Docker setup automatically:

  • Configures PostgreSQL with wal_level=logical
  • Runs db/init.sql to create tables and replication user

Note: In this repo, docker-compose.yml already builds the source PostgreSQL service from db/Dockerfile, which installs wal2json. If you run PostgreSQL outside this compose stack, ensure wal2json is installed on that server:

FROM postgres:16
RUN apt-get update && apt-get install -y postgresql-16-wal2json && rm -rf /var/lib/apt/lists/*

5. Verify Setup

# Check all services are running
docker-compose ps

# Check PostgreSQL replication is configured
docker exec walstream-postgres psql -U postgres -c "SHOW wal_level;"
# Should output: logical

# Check replication user exists
docker exec walstream-postgres psql -U postgres -c "SELECT rolname FROM pg_roles WHERE rolreplication = true;"
# Should show: repluser

6. Access Services


Option B: Manual/Local Setup

1. Install Dependencies

# Using uv (recommended)
uv sync

# Or using pip
pip install -e ".[dev]"

# Generate protobuf files
cd walstream-proto
python -m walstream_proto.generate
cd ..

2. PostgreSQL Setup (Source Database)

2.1 Configure WAL for Logical Replication

Edit postgresql.conf:

# Required for logical replication
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

Restart PostgreSQL after changing these settings.

2.2 Install wal2json Extension

# Ubuntu/Debian
sudo apt-get install postgresql-16-wal2json

# macOS with Homebrew
brew install wal2json

# Or build from source
git clone https://github.com/eulerto/wal2json.git
cd wal2json
make && sudo make install

2.3 Create Database and Replication User

# Connect as superuser
psql -U postgres
-- Create the source database
CREATE DATABASE walstreamdb;

-- Connect to it
\c walstreamdb

-- Create events table for testing
CREATE TABLE IF NOT EXISTS events (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    payload JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create replication user
CREATE USER repluser WITH REPLICATION LOGIN PASSWORD 'replpass';

-- Grant permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repluser;
GRANT USAGE ON SCHEMA public TO repluser;

-- Verify wal2json is installed
SELECT * FROM pg_available_extensions WHERE name = 'wal2json';

2.4 Configure pg_hba.conf

Add this line to allow replication connections:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     repluser        127.0.0.1/32            md5
host    replication     repluser        ::1/128                 md5

Reload PostgreSQL:

sudo systemctl reload postgresql
# or
pg_ctl reload

3. PostgreSQL Setup (Control Plane Database)

psql -U postgres
-- Create control plane database
CREATE DATABASE walstream_control;

-- Connect to it
\c walstream_control

-- Create dedup schema
CREATE SCHEMA IF NOT EXISTS walstream_dedup;

-- Create processed events table for deduplication
CREATE TABLE IF NOT EXISTS walstream_dedup.processed_events (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    processed_at TIMESTAMPTZ DEFAULT NOW(),
    job_id VARCHAR(36),
    expires_at TIMESTAMPTZ DEFAULT NOW() + INTERVAL '7 days'
);

CREATE INDEX IF NOT EXISTS idx_processed_events_expires
    ON walstream_dedup.processed_events(expires_at);

4. Run Alembic Migrations (Required Before First Start)

The control plane will fail fast on startup if the database schema is behind the expected Alembic head revision. Always run migrations before starting the control service:

cd control
alembic upgrade head
cd ..

# Verify (optional)
cd control && alembic current && cd ..

Or via Makefile:

make migrate          # runs: cd control && alembic upgrade head
make migrate-check    # shows current DB revision

5. Start Redis and Kafka

# Using Docker for infrastructure only
docker-compose up -d redis zookeeper kafka

Or install locally:

6. Configure Environment

Update .env with your local settings:

# Source database (with WAL)
POSTGRES_HOST=localhost
POSTGRES_USER=repluser
POSTGRES_PASSWORD=replpass
POSTGRES_DB=walstreamdb

# Control plane database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/walstream_control

# Redis
REDIS_URL=redis://localhost:6379/0

# Kafka
KAFKA_BROKER=localhost:29092

Create frontend local env from template:

cp frontend/.env.example frontend/.env.local

7. Start Services (Local Development)

# Terminal 1: Control plane
cd control && uvicorn app.main:app --reload --port 8000

# Terminal 2: Ingestor
python ingestor/ingestor.py

# Terminal 3: Replayer
python replayer/server.py

# Terminal 4: Frontend (optional)
cd frontend && npm install && npm run dev

Verify CDC is Working

1. Create Test Data

# Insert data into source database
docker exec walstream-postgres psql -U postgres -d walstreamdb -c \
  "INSERT INTO events (name, payload) VALUES ('test', '{\"key\": \"value\"}');"

2. Check Ingestor Logs

docker logs -f walstream-ingestor
# Should show: Ingested: INSERT on public.events (LSN: ...)

3. Check Redis Stream

docker exec walstream-redis redis-cli XLEN walstream:events
# Should show count > 0

docker exec walstream-redis redis-cli XRANGE walstream:events - + COUNT 1
# Should show the event

4. Check Kafka Topic

docker exec walstream-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic walstream.archive \
  --from-beginning \
  --max-messages 1

Troubleshooting

"FATAL: no pg_hba.conf entry for replication connection"

Add replication entry to pg_hba.conf (see step 2.4 above).

"wal2json not found"

Install the wal2json extension for your PostgreSQL version.

"replication slot already exists"

-- List existing slots
SELECT * FROM pg_replication_slots;

-- Drop if needed
SELECT pg_drop_replication_slot('walstream_slot');

"could not access file 'wal2json'"

Ensure wal2json is installed in PostgreSQL's lib directory and restart PostgreSQL.

API Usage

Authentication

# Register a user
curl -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{"username": "admin", "email": "admin@example.com", "password": "secret123"}'

# Get token
TOKEN=$(curl -X POST http://localhost:8000/api/v1/auth/token \
  -d "username=admin&password=secret123" | jq -r '.access_token')

# Service-to-service auth (if an API key is provisioned)
curl http://localhost:8000/api/v1/jobs \
  -H "X-API-Key: $API_KEY"

Replay Jobs

# Create replay job
curl -X POST http://localhost:8000/api/v1/jobs \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "start_time": "2024-01-15T00:00:00Z",
    "end_time": "2024-01-15T01:00:00Z",
    "speed_factor": 2.0
  }'

# Start the job
curl -X POST http://localhost:8000/api/v1/jobs/{job_id}/start \
  -H "Authorization: Bearer $TOKEN"

# Check job status
curl http://localhost:8000/api/v1/jobs/{job_id} \
  -H "Authorization: Bearer $TOKEN"

# Pause a running job
curl -X POST http://localhost:8000/api/v1/jobs/{job_id}/pause \
  -H "Authorization: Bearer $TOKEN"

# Resume a paused job
curl -X POST http://localhost:8000/api/v1/jobs/{job_id}/resume \
  -H "Authorization: Bearer $TOKEN"

# Cancel a job
curl -X POST http://localhost:8000/api/v1/jobs/{job_id}/cancel \
  -H "Authorization: Bearer $TOKEN"

WebSocket Events

const token = '<jwt>';
const ws = new WebSocket(`ws://localhost:8000/api/v1/events/ws?token=${token}`);
ws.onmessage = (event) => console.log(JSON.parse(event.data));

Project Structure

CDC-FastAPI/
├── frontend/                  # Next.js 14 web dashboard
│   ├── src/
│   │   ├── app/               # App Router pages
│   │   │   ├── (auth)/        # Auth layout (login)
│   │   │   ├── (dashboard)/   # Dashboard layout (protected)
│   │   │   └── api/           # API routes (metrics proxy)
│   │   ├── components/        # React components
│   │   │   ├── ui/            # shadcn/ui components
│   │   │   ├── layout/        # Sidebar, Header, ThemeToggle
│   │   │   ├── auth/          # AuthGuard
│   │   │   ├── errors/        # ErrorBoundary, ErrorFallback
│   │   │   ├── jobs/          # Job management
│   │   │   ├── events/        # Event stream
│   │   │   └── metrics/       # Charts & gauges
│   │   ├── hooks/             # Custom React hooks
│   │   ├── stores/            # Zustand stores (auth, events, ui)
│   │   ├── lib/               # API client, constants, utilities
│   │   ├── types/             # TypeScript types
│   │   └── middleware.ts      # Auth route protection
│   ├── Dockerfile             # Multi-stage Docker build
│   ├── .env.example           # Frontend environment template
│   └── package.json
│
├── walstream-proto/           # Protobuf definitions & Pydantic models
│   ├── proto/v1/              # .proto files (ChangeRecord, ReplayRequest, etc.)
│   ├── walstream_proto/       # Python package
│   │   ├── models.py          # Pydantic models for REST API
│   │   ├── generate.py        # Proto generation script
│   │   └── v1/                # Generated protobuf code
│   └── tests/                 # Compatibility tests
│
├── ingestor/                  # WAL capture & dual-write service
│   └── ingestor.py            # Main ingestor with wal2json v2 parsing
│
├── control/                   # FastAPI control plane
│   ├── app/
│   │   ├── main.py            # FastAPI app entry point
│   │   ├── config.py          # Pydantic Settings configuration
│   │   ├── database.py        # Async SQLAlchemy setup
│   │   ├── api/
│   │   │   ├── deps.py        # Dependency injection (auth, db)
│   │   │   └── v1/            # REST endpoints
│   │   │       ├── auth.py    # Authentication (JWT)
│   │   │       ├── jobs.py    # Replay job management
│   │   │       ├── events.py  # WebSocket streaming
│   │   │       └── health.py  # Health checks
│   │   ├── middleware/        # Auth context, audit, and rate limiting
│   │   ├── models/            # SQLAlchemy ORM models
│   │   │   ├── replay_job.py  # ReplayJob with lease support
│   │   │   ├── user.py        # User model
│   │   │   ├── dedup_state.py # Deduplication state
│   │   │   ├── api_key.py     # API key model
│   │   │   └── audit_log.py   # Audit log model
│   │   ├── services/          # Business logic
│   │   │   ├── replay_router.py  # Redis/Kafka source routing
│   │   │   ├── job_service.py    # Job lifecycle transaction logic
│   │   │   └── dedup_store.py    # Legacy dedup utility (not in active replay path)
│   │   └── workers/           # Background workers
│   │       ├── job_worker.py  # Durable job execution
│   │       └── manager.py     # Worker pool manager
│   ├── alembic/               # Database migrations
│   └── tests/                 # Test suite
│
├── replayer/                  # gRPC replay service
│   └── server.py              # Replayer with centralized dedup
│
├── db/                        # Database initialization
│   └── init.sql               # PostgreSQL setup script
│
├── monitoring/                # Observability
│   └── prometheus.yml         # Prometheus configuration
│
├── docker-compose.yml         # Complete stack definition
├── pyproject.toml             # Root project configuration
└── .env.example               # Environment template

Configuration

See .env.example for all configuration options. Key settings:

Variable Description Default
DATABASE_URL Control plane PostgreSQL postgresql+asyncpg://...
REDIS_URL Redis connection redis://localhost:6379/0
REDIS_STREAM_MAXLEN Max events in Redis 100000
KAFKA_BROKER Kafka bootstrap server localhost:9092
KAFKA_TOPIC Archive topic name walstream.archive
REPLAYER_HOST gRPC replayer host localhost
REPLAYER_PORT gRPC replayer port 50051
JOB_WORKER_COUNT Parallel job workers 4
JOB_LEASE_DURATION_SECONDS Worker lease TTL 300
MAX_EVENT_RETRIES Retries per event (after initial attempt) before job fails 3
SECRET_KEY JWT signing key (change in production)

Frontend Configuration (frontend/.env.local)

Variable Description Default
NEXT_PUBLIC_API_URL Backend API URL http://localhost:8000
NEXT_PUBLIC_WS_URL WebSocket URL ws://localhost:8000
PROMETHEUS_INGESTOR_URL Ingestor metrics http://localhost:9090/metrics
PROMETHEUS_CONTROL_URL Control metrics http://localhost:9091/metrics
PROMETHEUS_REPLAYER_URL Replayer metrics http://localhost:9092/metrics

Delivery Semantics

At-Least-Once with Idempotency:

  1. Events may be delivered multiple times due to retries or worker restarts
  2. Replayer enforces centralized deduplication using (lsn, table, pk_hash) key
  3. Replayer marks dedup state only after successful apply (exists -> apply -> mark_processed)
  4. Dedup state stored in PostgreSQL with 7-day TTL
  5. Workers trust Replayer for dedup - no local dedup to avoid race conditions
  6. Worker retries each failed event up to MAX_EVENT_RETRIES times after the initial attempt, then fails the job without moving checkpoint past that event
  7. Kafka resume checkpoints are tracked per partition in JSON (checkpoint) and serialized back into the replay source on resume
  8. Resumed jobs reuse their stored replay source; Kafka partitions without a resume offset seek by timestamp, then to end if no in-range offset exists

Idempotent Producer:

  • Kafka producer uses enable_idempotence=True to prevent duplicates from retries

Monitoring

Start the monitoring stack:

docker-compose --profile monitoring up -d

Key Metrics

Metric Description
walstream_events_ingested_total Events captured from WAL
walstream_events_published_redis_total Events written to Redis
walstream_events_published_kafka_total Events written to Kafka
walstream_events_replayed_total Events successfully replayed
walstream_events_duplicates_total Duplicate events skipped
walstream_events_failed_total Failed replay attempts
walstream_ingest_latency_seconds WAL-to-publish latency
walstream_replay_latency_seconds Per-event replay latency
`walstream_jobs_active{state="queued running"}`
walstream_job_lease_expired{state="running"} Running jobs with expired worker leases

SLO Targets & Alert Thresholds

SLO Target Alert Threshold Alert Name Severity
Ingest availability Events flowing continuously 0 events/5min IngestorStalled critical
Ingest latency (p99) < 1s WAL-to-publish p99 > 1s for 5min HighIngestLatency warning
Replay availability Events replayed while jobs are active 0 events/10min with queued/running jobs ReplayerStalled critical
Replay success rate > 95% Failure rate > 5% for 5min ReplayerHighFailureRate warning
Replay latency (p99) < 500ms p99 > 500ms for 5min HighReplayLatency warning
Redis buffer headroom < 90% of MAXLEN > 90,000 entries for 2min RedisStreamNearCapacity warning
Kafka consumer lag < 10,000 messages > 10,000 for 5min KafkaConsumerLagCritical critical
Job lease integrity No orphaned leases Running job with expired lease for 1min JobLeaseExpiredWithRunningState warning
DB schema alignment Alembic head matches code Checked at startup; mismatch = fail-fast ControlPlaneDown (startup failure symptom) critical

Alert rules are defined in monitoring/alertmanager/rules/walstream.yml. Alertmanager routes critical alerts to PagerDuty and warning alerts to Slack (configure in monitoring/alertmanager/alertmanager.yml).

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=control --cov=ingestor --cov=replayer

# Format code
ruff format .

# Lint code
ruff check .

# Type check
mypy control/ ingestor/ replayer/

Code Style

This project follows the Google Python Style Guide. See google_python_style_guide.md for a local reference.


Implementation Checklist

Items marked [x] are complete; [ ] items remain.


Backend — Phase 0: Critical Fixes

  • db/init.sql — fixed syntax, idempotent user creation, includes dedup schema
  • Removed legacy standalone worker/ utilities (superseded by control/app/workers/)
  • Django replaced with FastAPI control plane; legacy Django scaffolding removed

Backend — Phase 1: Protobuf Packaging & Versioning

  • walstream-proto/ package with proto/v1/ directory structure
  • walstream.proto with reserved field ranges and versioning strategy
  • Generated pb2 fileswalstream_pb2.py and walstream_pb2_grpc.py are present under walstream-proto/walstream_proto/v1/
  • Pydantic models (walstream_proto/models.py) with from_protobuf / to_protobuf / idempotency_key
  • Compatibility tests (walstream-proto/tests/test_compatibility.py)

Backend — Phase 2: Ingestor Enhancement

  • Full wal2json format-version 2 parsing (parse_wal2json_v2) with operation/old/new extraction
  • Redis XADD with MAXLEN trimming (100K, approximate)
  • Prometheus metrics (7 metrics: ingested, published, errors, lag, latency)
  • Signal handling (SIGTERM/SIGINT) with graceful shutdown and auto-restart loop
  • Kafka idempotent producer (enable_idempotence=True)

Backend — Phase 3: FastAPI Control Plane

  • control/app/config.py — Pydantic Settings with all env vars
  • control/app/database.py — async SQLAlchemy engine + session factory
  • control/app/models/ — User, ReplayJob (with lease fields), ProcessedEvent (dedup)
  • control/app/api/v1/jobs.py — full CRUD + start/pause/resume/cancel endpoints
  • control/app/api/v1/auth.py — JWT token, register, /me
  • control/app/api/v1/events.py — WebSocket live streaming, /stream/stats, /recent
  • control/app/api/v1/health.py — liveness + readiness probes with component checks
  • control/app/services/replay_router.py — Redis/Kafka source selection with source-pinned resume and safe partition seek behavior
  • control/app/services/dedup_store.py — retained legacy utility module (active dedup is centralized in Replayer)
  • control/app/workers/job_worker.py — lease-based acquisition, renewal, fail-fast replay, and per-partition checkpoint/resume
  • control/app/workers/manager.py — worker pool manager
  • Alembic migrations directory scaffold
  • Alembic revision files for controlled schema evolution (3 migrations: initial, RBAC+audit, API keys)
  • Remove production startup create_all path; use migration-only schema bootstrap
  • Add startup DB revision guard (fail fast when DB is behind expected Alembic head)
  • Pause/cancel API transitions must clear worker_id + lease_expires_at atomically
  • Separate read/write DB session policy (no implicit commit on read-only request paths)
  • Service-layer transaction boundaries for multi-step job state transitions

Backend — Phase 4: gRPC Replayer

  • replayer/server.py — centralized dedup enforcement (exists -> apply -> mark_processed)
  • PostgreSQLTargetApplier — INSERT/UPDATE/DELETE/TRUNCATE with ON CONFLICT handling
  • LoggingTargetApplier — dry-run testing
  • Health RPC endpoint
  • Prometheus metrics (replayed, duplicates, failed, latency)
  • Automatic dedup entry cleanup task

Backend — Phase 5: Security & Authentication

  • JWT authentication (OAuth2PasswordBearer, token generation/validation)
  • User model with is_active, is_superuser flags
  • CORS middleware configured (origins via settings)
  • Middleware stale-token handling with deterministic recovery path (AuthContextMiddleware + X-Token-Expired header)
  • Align middleware/API auth validation behavior (AuthContextMiddleware feeds identity to audit; deps distinguish expired vs missing tokens)
  • Role-based authorization (RBAC) — admin/operator/viewer hierarchy with require_role dependency
  • Audit loggingAuditLog model and middleware for mutation tracking
  • Rate limiting middleware
  • API key support for service-to-service auth

Backend — Phase 6: Observability & Alerting

  • Prometheus metrics on all services (ingestor :9090, control :9091, replayer :9092)
  • monitoring/prometheus.yml scrape config
  • Prometheus + Grafana in docker-compose (monitoring profile)
  • Alerting rules (monitoring/alertmanager/rules/walstream.yml) — IngestorStalled, RedisStreamNearCapacity, ReplayerStalled, KafkaConsumerLagCritical, etc.
  • Alerts for replay lease/state inconsistency invariants and control-plane availability
  • Alertmanager configuration — Slack/PagerDuty routing, severity grouping
  • Grafana dashboards — provisioned JSON dashboards for all services
  • SLO documentation — alert thresholds, SLO targets, runbook URLs

Backend — Phase 7: Testing

  • walstream-proto/tests/test_compatibility.py — proto serialization + Pydantic tests
  • control/tests/test_health.py — basic health endpoint tests
  • Unit teststest_parse_wal2json_v2, test_idempotency_key, test_replay_router_source_selection, test_rbac, test_middleware
  • Integration teststest_job_lifecycle (CRUD + full state machine through API), test_auth (endpoint auth enforcement)
  • Failure mode tests — bounded retries, retry exhaustion, checkpoint non-advancement on failure (test_delivery_semantics)
  • Delivery semantics tests — replayer dedup contract (exists→apply→mark), duplicate skipped, failed apply not marked
  • Job lifecycle invariant tests — pause/cancel transitions clear lease ownership (test_job_service)
  • Migration strategy tests — startup fails when DB revision is behind expected Alembic head (test_migration_guard)
  • Replay resume matrix tests — Redis checkpoint, Kafka single/multi-partition checkpoint, sparse partition checkpoint (test_replay_resume)
  • Kafka range-boundary tests — partitions with no in-range timestamp offset seek to end (test_replay_resume)
  • Auth consistency tests — stale token + expired token returns 401 with X-Token-Expired, health is public (test_auth)
  • E2E tests — full pipeline: INSERT → ingest → replay → verify in target

Backend — Phase 8: Docker & DevEx

  • Dockerfiles for all services (ingestor, control, replayer, frontend)
  • docker-compose.yml — complete stack with health checks, volumes, networking
  • Makefilemake up, make test, make proto, make lint, make migrate, make migrate-check, make test-delivery
  • Clean up legacy Django files — removed control/walstream/, control/replay/, and control/manage.py
  • .env.example — root-level env template for easy onboarding
  • Migration-first local/dev startup docs and scripts (no runtime schema auto-create in production path)

Frontend — Phase 1: Foundation

  • Next.js 14 project with TypeScript, Tailwind CSS, shadcn/ui (19 components)
  • API client (lib/api/client.ts) with JWT interceptors and 401 redirect
  • Auth store (stores/authStore.ts) with Zustand persist + cookie sync
  • Login page with Zod validation and react-hook-form
  • Auth middleware (middleware.ts) + client-side AuthGuard component
  • Dashboard layout with Sidebar and Header
  • React Query provider (lib/providers.tsx)

Frontend — Phase 2: Job Management

  • Job TypeScript types (types/job.ts) with all plan fields + extras
  • Jobs API functions (lib/api/jobs.ts) — full CRUD + lifecycle actions
  • useJobs React Query hooks with dynamic polling intervals
  • JobsTable — sortable, filterable, paginated with inline actions
  • JobStatusBadge — colored status indicators
  • JobActions — start/pause/resume/cancel/delete with confirmation dialogs
  • JobDetailPanel — full info, progress, timeline, stats grid
  • JobCreateForm — Zod schema, time range presets, speed factor

Frontend — Phase 3: Real-time Events

  • useWebSocket hook with auto-reconnect, status tracking, exponential backoff
  • Event store (stores/eventStore.ts) — Zustand, 1000-event FIFO, filtering, pause
  • EventStream — virtualized list with @tanstack/react-virtual
  • EventCard — operation-colored event display
  • EventFilters — table name, operation type, search query
  • EventDetailModal — full payload view with tabs
  • ConnectionStatus — WebSocket status indicator

Frontend — Phase 4: Metrics Dashboard

  • Prometheus text parser (lib/utils/prometheus-parser.ts) with histogram percentiles
  • Metrics API route (app/api/metrics/route.ts) — proxies to 3 Prometheus endpoints
  • useMetrics hooks — raw metrics, snapshot, history with rate calculations
  • EventsLineChart — ingested vs replayed over time
  • OperationsBarChart — horizontal bars by operation type
  • LatencyChart — p50/p90/p99 bar chart
  • ThroughputGauge — SVG gauge with events/sec
  • ReplayStatusChart, RedisStreamCard, StatsCard — extra components
  • Auto-refresh via React Query refetchInterval

Frontend — Phase 5: Polish & Integration

  • Dashboard overview page with real metrics, health status, recent jobs
  • Toast notifications (shadcn/ui Toaster)
  • Error boundaries (ErrorBoundary, ErrorFallback, per-route error.tsx)
  • not-found.tsx — custom 404 page
  • Docker config (multi-stage Dockerfile, .dockerignore)
  • Stale-cookie session handling UX alignment between middleware guards and API 401 handling
  • Degraded readiness/503 UI states for dashboard health cards and overview widgets
  • Dark/light mode toggle (ThemeToggle.tsx) — next-themes with system/light/dark support
  • Mobile responsive sidebar — hamburger menu with overlay on small screens
  • Extracted dashboard componentsOverviewCards, RecentJobs, HealthStatus extracted from page.tsx
  • UI store (stores/uiStore.ts) — sidebar open/close state for mobile
  • Utility fileslib/utils/formatters.ts, lib/constants.ts
  • Frontend .env.example — environment template for developers
  • useDebounce hook — for search input debouncing in EventFilters
  • Dynamic imports for chart components (code splitting with next/dynamic)

Frontend — Testing

  • Vitest + Testing Library — component unit tests
  • Hook testsrenderHook for useJobs, useWebSocket, useMetrics
  • MSW integration tests — API client with mock service worker
  • Middleware/auth tests — stale cookie + invalid token redirect behavior
  • Health contract tests — flat keys (postgres/redis/kafka) + components map compatibility
  • Playwright E2E (optional) — login flow, job lifecycle, event filtering

Not In Original Plan (Bonus Features Completed)

  • useHealth hook and health API client
  • useEvents hook for REST event fetching
  • Auth cookie sync for server-side middleware
  • RedisStreamCard — Redis buffer visualization
  • ReplayStatusChart — replay/duplicate/failed breakdown chart
  • StatsCard — generic reusable metrics card
  • Per-route error pages (app/error.tsx, app/(dashboard)/error.tsx)

License

MIT

About

A production-ready Change Data Capture pipeline for PostgreSQL WAL changes in real-time. Built using FastAPI, PostgreSQL, Redis, Kafka, and Next.js

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors