Small FastAPI service that ingests webhook payloads, stores them durably, and processes them asynchronously in a background worker.
- Idempotent ingestion using
X-Idempotency-Keyscoped byX-Source-Id. - Durable persistence in PostgreSQL with schema migrations via Alembic.
- Background processing using Dramatiq backed by Redis.
- Containerized local dev environment with Docker Compose.
- Integration/concurrency testing against a running service.
FastAPI, SQLAlchemy (sync), Alembic, PostgreSQL, Redis, Dramatiq, Docker Compose.
POST /webhooks writes a webhook record to PostgreSQL and enqueues a Dramatiq job in Redis. A separate worker process consumes jobs and updates webhook status. You can query the current state with GET /webhooks/{id}.
- Docker + Docker Compose v2 (
docker compose) curl
cp .env.example .envdocker compose up -d --buildRun database migrations (creates tables/types):
docker compose exec web alembic upgrade headStop services:
docker compose downOptional: stop and remove volumes:
docker compose down -vcurl -sS http://localhost:8000/healthExpected:
{"status":"ok"}OpenAPI:
- Swagger UI: http://localhost:8000/docs
- OpenAPI JSON: http://localhost:8000/openapi.json
The API supports idempotency via X-Idempotency-Key (recommended) and client scoping via X-Source-Id (optional; defaults to DEFAULT_SOURCE_ID).
- Ingest a webhook payload (returns
202and a webhook id):
curl -sS -i -X POST http://localhost:8000/webhooks \
-H 'Content-Type: application/json' \
-H 'X-Source-Id: demo-client' \
-H 'X-Idempotency-Key: demo-123' \
-d '{"hello":"world"}'Expected body shape:
{"id":"<uuid>","status":"pending","fingerprint":"<sha256>"}- Resend the exact same request (same
X-Source-Id, same idempotency key, same JSON):
curl -sS -i -X POST http://localhost:8000/webhooks \
-H 'Content-Type: application/json' \
-H 'X-Source-Id: demo-client' \
-H 'X-Idempotency-Key: demo-123' \
-d '{"hello":"world"}'Expected:
- Status still
202 - Response body is the same as the first request
- Header
X-Idempotency-Replayed: true
- Reuse the same key with a different payload (should be rejected with
409):
curl -sS -i -X POST http://localhost:8000/webhooks \
-H 'Content-Type: application/json' \
-H 'X-Source-Id: demo-client' \
-H 'X-Idempotency-Key: demo-123' \
-d '{"hello":"DIFFERENT"}'Expected application/problem+json shape:
{"title":"Conflict","status":409,"detail":"...","instance":"/webhooks"}- Fetch the webhook status (copy the
<uuid>from step 1):
Status may take a moment to change as the worker processes jobs.
curl -sS http://localhost:8000/webhooks/<uuid>Expected body shape:
{"id":"<uuid>","source_id":"demo-client","idempotency_key":"demo-123","fingerprint":"<sha256>","status":"pending|processing|done|failed","payload":{}}The app reads configuration from environment variables (see .env.example). Key variables used by the application:
| Variable | Used by | Purpose |
|---|---|---|
DATABASE_URL |
src/app/config.py, Alembic |
PostgreSQL connection string (SQLAlchemy URL). |
REDIS_URL |
src/app/config.py, integration tests |
Redis URL used by the worker stack and tests. |
DRAMATIQ_BROKER_URL |
src/app/config.py |
Dramatiq broker URL (defaults to REDIS_URL). |
DEFAULT_SOURCE_ID |
src/app/config.py |
Fallback source_id when X-Source-Id is not provided. |
LOG_LEVEL |
src/app/config.py |
Logging verbosity (e.g., INFO). |
ENVIRONMENT |
src/app/config.py |
Environment label (e.g., development). |
Run the full suite locally:
pytest -qOr use the provided helper (also tees output to logs/pytest.log):
./scripts/run-tests.shConcurrency/idempotency test (expects a running web + worker + postgres + redis):
pytest -q tests/integration/test_idempotency_concurrency.py- Containers not starting or unhealthy:
docker compose ps
docker compose logs -f postgres redis web worker-
Port already in use (common: Postgres): edit
POSTGRES_HOST_PORTin.env(default is5433). -
Migrations failing:
docker compose exec web alembic upgrade head- Reset everything (drops volumes/data):
docker compose down -v