This guide is for a junior engineer who wants to become “pro‑level” at understanding how this project works end‑to‑end.
It explains:
- What technologies are used (FastAPI, Starlette/ASGI, PostgreSQL, asyncpg, Prometheus, Alembic, pytest, etc.)
- How HTTP requests flow through the app
- How jobs move through the queue (enqueue → lease → ack/nack → DLQ)
- Where to look in the codebase when you are debugging or extending features
You can read this file top‑to‑bottom once, then treat it as a reference.
- OpenQueue is a hosted job queue service.
- Producers send HTTP requests to enqueue jobs.
- Workers send HTTP requests to lease jobs, process them, and ack/nack results.
- All jobs are stored in PostgreSQL, not in memory.
- The service is implemented as a FastAPI application running on an ASGI server (like
uvicorn) and uses:- Starlette (the low‑level web framework FastAPI sits on)
- async/await for concurrency
- asyncpg for asynchronous Postgres access
- Prometheus metrics and health endpoints for observability
- Alembic for database migrations
Mentally, imagine three boxes:
- Clients (your apps + your workers) → send HTTP requests.
- FastAPI service (this repo) → authenticates, validates, applies business rules, talks to DB.
- PostgreSQL → actual storage and queue logic via SQL + row locking.
- ASGI = Asynchronous Server Gateway Interface.
- It is a standard that defines how async Python web servers (like
uvicorn) talk to Python applications. - An ASGI app is basically a callable that the server calls for every request.
- It is a standard that defines how async Python web servers (like
- Why it matters here:
- Because ASGI is async, you can use
async defendpoints and non‑blocking DB drivers likeasyncpg. - Middleware, lifespan events, and WebSockets all build on top of ASGI.
- Because ASGI is async, you can use
You generally don’t write pure ASGI in this project – FastAPI and Starlette hide the low‑level details, but they are built on top of it.
- Starlette is a lightweight ASGI framework:
- Routing
- Middleware
- Request/response objects
- Background tasks, WebSockets, etc.
- In this project, you see Starlette concepts mainly in:
Request/ResponseobjectsBaseHTTPMiddlewarefor custom middleware (logging, metrics, request IDs)
You don’t normally import Starlette directly; instead, FastAPI re‑exports many of the pieces.
- FastAPI sits on top of Starlette and adds:
- Super convenient routing with decorators like
@router.get,@router.post. - Dependency injection (the
Depends(...)system). - Request body parsing and response models via Pydantic.
- Automatic OpenAPI schema and interactive docs (
/docs).
- Super convenient routing with decorators like
- In this project, the main FastAPI setup is:
app/fastapi_app.py→ exposes theappobject foruvicorn.app/core/app_factory.py→ builds and configures the FastAPI app:- sets metadata (title, version, description)
- registers middleware
- includes routers.
The important mental model:
- FastAPI = high‑level framework for HTTP APIs.
- You think in terms of endpoints, request models, response models, and dependencies.
- Pydantic is used to define request and response schemas.
- You define models as Python classes with type hints.
- Pydantic automatically validates and converts incoming JSON into those models.
- In this project:
app/models.pycontains:JobCreate– request body forPOST /jobs.JobResponse– response structure when returning job data.LeaseRequest,LeaseResponse,AckRequest,NackRequest,HeartbeatRequest– request/response models for worker APIs.JobListResponse– paginated list for dashboards.
By using Pydantic:
- Endpoints get typed, validated Python objects instead of raw dicts.
- OpenAPI docs are generated automatically from these models.
This is the key mental model that will make you feel “pro” when reading the code.
We’ll follow a typical request, for example POST /jobs (enqueue a job).
uvicornreceives an HTTP request from a client.- It passes it to the ASGI app exposed in:
app/fastapi_app.py→app = create_app(...).create_appis defined inapp/core/app_factory.py.
In create_app, middleware is registered:
- RequestIdMiddleware (
app/middleware.py)- Ensures every request has a unique
X-Request-ID. - Stores it on
request.state.request_id.
- Ensures every request has a unique
- StructuredLoggingMiddleware
- Logs a single structured line per request (method, path, status, duration, client IP, user agent, request ID).
- PrometheusHttpMetricsMiddleware
- Updates Prometheus counters/histograms:
- total HTTP requests
- request duration by method/path.
- Updates Prometheus counters/histograms:
Conceptually:
- Middleware wraps the request handling.
- They run before and after your endpoint handler.
- FastAPI matches the request to the appropriate router & endpoint function:
- Producers:
app/routers/jobs.py - Workers:
app/routers/workers.py - Dashboard:
app/routers/dashboard.py - Observability:
app/routers/observability.py
- Producers:
- Each endpoint declares dependencies using
Depends(...), for example:AuthUserDep– resolves the current user from theAuthorization: Bearer <token>header usingapp/auth.py.- Rate‑limit dependencies – e.g.
RateLimitEnqueueDepusingapp/deps.pyandapp/rate_limit.py.
FastAPI automatically:
- Parses the request body into a Pydantic model.
- Resolves dependencies (auth, rate limits, etc.).
- Calls your endpoint function with already validated arguments.
Endpoints in routers are kept intentionally thin:
- They import functions from
app/services/jobs_service.py, such as:enqueue_joblease_nextacknackheartbeatlist_user_jobsqueue_stats.
The service layer is where you see:
- Business rules (e.g. metrics labels, return shaping).
- Prometheus metrics for operations (enqueued, leased, acked, nacked, DLQ moves, etc.).
- Simple transformations of data from the DB layer into Pydantic models.
This separation gives you a clean mental map:
- Router = HTTP surface + auth + rate limits.
- Service = business rules + metrics.
- CRUD = SQL and DB details.
app/crud.pyis where real SQL lives:- Insert jobs (
create_job). - Select jobs (
get_job_status,get_job,list_jobs). - Lease jobs (
lease_next_job) with row locking. - Update on ack/nack/heartbeat/cancel (
ack_job,nack_job,heartbeat_job,cancel_job). - Queue stats (
get_queue_stats).
- Insert jobs (
- The CRUD layer talks to Postgres via a connection pool provided by:
app/database.py→ aDatabaseclass that wraps anasyncpg.Pool.
When a CRUD function is called:
- It uses
async with db.get_pool() as pool:- Then
async with pool.acquire() as conn:to get a connection. - Then executes SQL with
conn.fetch,conn.fetchrow,conn.execute, etc.
- Then
Because all of this is async, multiple requests can share the connection pool efficiently.
- CRUD returns raw data (e.g. dicts or
asyncpg.Recordobjects). - Service functions convert them into plain dicts or Pydantic models.
- Routers return Pydantic models (or dicts) and FastAPI turns them into JSON responses.
- Middleware wraps the response:
- logging
- metrics
- request ID headers.
From the outside, clients just see:
- A clean JSON API with documented responses.
- OpenQueue uses PostgreSQL as both:
- a data store (
users,jobstables), - a queue engine using row locks.
- a data store (
- Key ideas:
- Jobs are rows in a
jobstable. - Leasing uses:
SELECT ... FOR UPDATE SKIP LOCKEDto atomically claim a job.locked_until,locked_by,lease_tokencolumns to track leases.
- Retry and delayed execution use:
run_at(a timestamp field).
- Jobs are rows in a
Because everything is in Postgres:
- You get durability and can debug with SQL queries.
- asyncpg is an asynchronous Postgres driver.
- It exposes an
asyncpg.Poolfor connection pooling. - Queries are
await‑ed:await conn.fetch(...),await conn.execute(...).
- It exposes an
- In
app/database.py:Database._ensure_poolcreates a pool lazily using the URL from settings.get_poolis an async context manager that yields the pool.
Why async matters:
- While one request is waiting on IO (DB call), the event loop can handle another request.
- This allows a single process to handle many concurrent requests efficiently.
- Alembic is used to manage DB schema changes over time.
- Migration scripts live under
migrations/. alembic upgrade headbrings your DB up to the latest schema.
- Migration scripts live under
- The CI workflow:
- Starts Postgres.
- Runs
alembic upgrade head. - Then runs
pytest.
This ensures:
- Your schema and code stay in sync in all environments.
- Authentication is API‑token based:
- Clients send
Authorization: Bearer <token>. - The app hashes the token and looks up a user row in the
userstable.
- Clients send
- Token hashing is done in
app/auth.py:- By default:
sha256(token). - If
OPENQUEUE_TOKEN_HMAC_SECRETis set, then:HMAC-SHA256(secret, token).
- By default:
- Why HMAC?
- If the DB leaks, an attacker cannot easily guess valid tokens without the secret.
- FastAPI’s
HTTPBearersecurity scheme parses theAuthorizationheader. get_current_user:- extracts the token,
- hashes it,
- looks up the user in DB,
- ensures
is_activeis true, - updates
last_seen_at.
- Routers use
AuthUserDepfromapp/deps.pyso every authenticated endpoint automatically has the current user.
Mentally:
- “Current user” is always available as a dict (
{"id": ..., "email": ..., "is_active": ...}) to endpoint and service code.
app/rate_limit.pydefines:RateLimit– a rate + burst (tokens/sec, bucket size).RateLimiter– a token‑bucket implementation keyed by (principal, action).DEFAULT_LIMITS– per‑action defaults:enqueue,lease,ack,nack,heartbeat,list_jobs,queue_stats.
app/deps.pywires this into FastAPI dependencies:RateLimitEnqueueDep,RateLimitLeaseDep, etc.
- Each request:
- Computes a principal key (usually the user id).
- Consumes tokens from the appropriate bucket.
- Raises a
429 Too Many Requestsif the bucket is empty, with aRetry-Afterheader.
This protects:
- The API from abusive or buggy clients.
- The database from being flooded with operations.
app/rate_limit.pyalso provides:enforce_json_size_guardrail– checks raw request body length against a limit.
app/settings.pydefines:max_enqueue_payload_bytesmax_result_payload_bytesmax_error_text_bytes.
These settings let you control how big requests and results can be.
Understanding this makes the whole system “click”.
- Client calls
POST /jobswithJobCreatepayload. - Auth + rate limiting run.
jobs_service.enqueue_jobis called.crud.create_jobinserts a row intojobs:status = 'pending'run_at = NOW()by defaultretry_count = 0,max_retriesfrom requestpriorityfrom request.
- Metrics counter
JOBS_ENQUEUED_TOTALis incremented.
Result: job row appears in DB and is now leaseable.
- Worker sends a
LeaseRequest(worker id, lease seconds). - Auth + rate limiting run.
jobs_service.lease_nextcallscrud.lease_next_job.lease_next_job:- Finds one qualifying job:
status = 'pending' AND run_at <= NOW()orstatus = 'processing' AND locked_until < NOW()(expired lease).
- Orders by:
priority DESCcreated_at ASC(FIFO inside priority).
- Uses
FOR UPDATE SKIP LOCKEDto avoid race conditions. - Updates:
status = 'processing'locked_by = worker_idlocked_until = NOW() + lease_secondslease_token = gen_random_uuid()lease_lost_countif it was expired.
- Finds one qualifying job:
LeaseResultis returned and then exposed as aLeaseResponseto the worker.
If there is no job, the endpoint returns null.
- Worker sends
AckRequestwith thelease_tokenand optional result JSON. - Auth + rate limiting run.
jobs_service.ackcallscrud.ack_job.ack_jobupdates the row only if:id,user_id,status = 'processing', andlease_tokenall match.
- If the update succeeds:
status = 'completed'resultis storedfinished_atis set- lease metadata is cleared.
- Metrics:
JOBS_ACKED_TOTALis incremented.
If anything doesn’t match (wrong token, wrong status, wrong user), the endpoint returns 409 Conflict.
- Worker sends
NackRequestwithlease_token,errorstring,retrybool. - Auth + rate limiting run.
jobs_service.nackcallscrud.nack_job.nack_job:- Checks that the job is
processingand thelease_tokenmatches. - Reads
retry_countandmax_retries. - If
retry=Trueand retries remain:status = 'pending'retry_count += 1run_at = NOW() + backoff_seconds(exponential backoff)- lease metadata is cleared.
- Else (no retries left or
retry=False):status = 'dead'dead_reasonanddead_atseterror_textupdatedfinished_atset.
- Checks that the job is
- Metrics:
JOBS_NACKED_TOTALincrements.JOBS_MOVED_TO_DLQ_TOTALincrements in some cases.
Result: job is either requeued for later or sent to the DLQ.
- Worker sends
HeartbeatRequest(lease token, new lease seconds). - Auth + rate limiting run.
jobs_service.heartbeatcallscrud.heartbeat_job.heartbeat_job:- Updates
locked_until = NOW() + lease_seconds - Only if
status = 'processing'andlease_tokenmatches.
- Updates
This keeps long‑running jobs from being re‑leased to another worker.
- Metrics:
- Defined in
app/metrics.py. - Exposed at
/metricsviaapp/routers/observability.py. - Include:
- HTTP request counts & latencies.
- Jobs enqueued, leased, acked, nacked, cancelled, DLQ’d.
- Lease expiry recoveries.
- Processing duration histograms.
- Defined in
- Health checks:
/health– liveness (process is up)./ready– readiness (can connect to DB and runSELECT 1).
These are what you’d hook up to Kubernetes, monitoring, dashboards, and alerts.
app/maintenance.pydefines:reap_expired_leases– policy hook to clean up jobs stuck inprocessing.delete_old_jobs– delete jobs of certain statuses older than a TTL.run_maintenance_once– run one iteration of all tasks.maintenance_loop– long‑running async loop to run maintenance periodically.
- In production, you typically:
- Run maintenance in a separate worker process or container.
- Configure intervals and retention with
MaintenanceConfig.
pyproject.tomlconfigures pytest:- Async tests with
pytest-asyncio. - Test paths in
tests/.
- Async tests with
- A key test is
tests/test_concurrency_leasing.py:- Boots a minimal schema (or uses your migrations).
- Enqueues 100 jobs.
- Uses many concurrent “workers” to lease jobs.
- Asserts that:
- There are no duplicate leases.
- All jobs end up leased exactly once.
This gives strong evidence that the leasing algorithm is correct under concurrency.
.github/workflows/test.yml:- Starts Postgres as a service.
- Sets
DATABASE_URLandOPENQUEUE_TOKEN_HMAC_SECRET. - Installs dependencies.
- Runs Alembic migrations.
- Runs pytest.
When CI is green, you know:
- DB schema + migrations + code + tests all agree.
To go from “understands the guide” to “pro at this codebase”, here’s a suggested path:
- Step 1 – Trace one happy path end‑to‑end
- Start at
POST /jobsinapp/routers/jobs.py. - Follow calls through
services.jobs_service, thencrud, then skim the SQL. - Check the
jobstable definition in migrations.
- Start at
- Step 2 – Trace one worker path
POST /queues/{queue_name}/lease→ lease job.POST /jobs/{job_id}/ack→ complete job.POST /jobs/{job_id}/nack→ DLQ behavior.
- Step 3 – Explore cross‑cutting concerns
- Read
app/auth.pyandapp/deps.pyto fully grok auth and rate limiting. - Read
app/middleware.py+app/metrics.pyto understand logging/metrics.
- Read
- Step 4 – Run and observe
- Run the app locally.
- Use the
/docsUI to enqueue and lease jobs. - Watch logs and
/metricsoutput to see it all in action.
After you can comfortably navigate those flows without re‑opening this guide, you’ve effectively “mastered” the architecture and core concepts of this project.