Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
918c5ab
feat: Add bounded-concurrency env knobs for the workflow pipeline
conradbzura Jun 22, 2026
67e1d40
feat: Add per-worker backpressure for the wool worker pool
conradbzura Jun 22, 2026
8823a31
feat: Add a priority load balancer for the workflow worker pool
conradbzura Jun 22, 2026
5ff27db
feat: Add JobRecord dispatch-scheduling fields and lock queries
conradbzura Jun 22, 2026
de19cae
fix: Harden dispatch-scheduling lock helpers per review
conradbzura Jun 22, 2026
2e7925a
feat: Index the durable retry scheduler's due-dispatch lease query
conradbzura Jun 22, 2026
d7a63fe
test: Cover the dispatch-scheduling lock helpers and JobRecord fields
conradbzura Jun 22, 2026
c8eae8d
test: Harden load-balancer, backpressure, and wiring coverage
conradbzura Jun 22, 2026
d6c5309
feat: Drive workflow dispatch with a durable bounded-concurrency sche…
conradbzura Jun 22, 2026
c60f6ec
feat: Surface admission rejection as 429 and start the retry scheduler
conradbzura Jun 22, 2026
8480a65
docs: Document bounded-concurrency, durable queuing, and admission co…
conradbzura Jun 22, 2026
db6529e
fix: Mark a workflow RUNNING on worker acceptance to stop double-disp…
conradbzura Jun 22, 2026
23ec652
feat: Autonomously recover workflows orphaned by an API or worker crash
conradbzura Jun 23, 2026
26f0ab4
fix: Resolve review-2 findings in the workflow executor
conradbzura Jun 23, 2026
a263f5c
test: Tighten test-double fidelity, mutex predicate, and naming per r…
conradbzura Jun 23, 2026
9a2e4db
docs: Clarify admission-ceiling attach behavior and 202 queuing
conradbzura Jun 23, 2026
779b060
fix: Resolve review-3 advisories in the workflow dispatch layer
conradbzura Jun 23, 2026
2176903
test: Add review-3 coverage and tighten test fidelity
conradbzura Jun 23, 2026
d841aa4
docs: Note recovered jobs share the from-submission dispatch deadline
conradbzura Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,13 @@ Stream file contents from DCCs via HTTPS.
| Code | Description |
|------|-------------|
| 200 | Full file content (GET) or file metadata (HEAD) |
| 202 | Preprocessed artifact not yet cached — workflow dispatched. `Location: /jobs/{id}` and `Retry-After` headers point to the polling endpoint. |
| 202 | Preprocessed artifact not yet cached — workflow accepted. `Location: /jobs/{id}` and `Retry-After` headers point to the polling endpoint. Under load the job may sit `pending` in the durable queue before a worker picks it up; poll `/jobs/{id}` for progress. |
| 206 | Partial content (Range request) |
| 400 | Invalid DCC, path-param shape, or Range header |
| 403 | File requires authentication (consortium/protected access) or denied by upstream repository |
| 404 | File not found, or HEAD probe of a not-yet-cached processed artifact (GET would dispatch) |
| 416 | Range not satisfiable (out of bounds, or file size unknown so no range can be satisfied) |
| 429 | Too many active preprocessing jobs — the active-workflow ceiling (`CFDB_WORKFLOW_MAX_ACTIVE`) is reached; `Retry-After` header set. Retry shortly. |
| 501 | No supported access method (e.g., Globus-only files) |
| 502 | Upstream service error |
| 503 | Workflow subsystem shutting down (`Retry-After`) |
Expand Down Expand Up @@ -586,12 +587,13 @@ Stream index files (e.g., `.px2`, `.bai`) associated with DCC data files.
| Code | Description |
|------|-------------|
| 200 | Full index file content (GET) or file metadata (HEAD) |
| 202 | Index not yet cached — workflow dispatched. `Location: /jobs/{id}` and `Retry-After` headers point to the polling endpoint. |
| 202 | Index not yet cached — workflow accepted. `Location: /jobs/{id}` and `Retry-After` headers point to the polling endpoint. Under load the job may sit `pending` in the durable queue before a worker picks it up; poll `/jobs/{id}` for progress. |
| 206 | Partial content (Range request) |
| 400 | Invalid DCC, path-param shape, or Range header |
| 403 | File requires consortium/protected access (HuBMAP) |
| 404 | File not found, format has no index (CSV/TSV/bigWig), or HEAD probe of a not-yet-cached index |
| 416 | Range not satisfiable |
| 429 | Too many active preprocessing jobs — the active-workflow ceiling (`CFDB_WORKFLOW_MAX_ACTIVE`) is reached; `Retry-After` header set. Retry shortly. |
| 502 | Upstream service error or malformed sidecar |
| 503 | Workflow subsystem disabled (set `SYNC_DATA_DIR`) for a processable format, or shutting down (`Retry-After`) |

Expand Down Expand Up @@ -651,6 +653,13 @@ The preprocessed artifact is the default response. Clients that want the raw ups

Cache keys are content-addressed using each file's upstream `md5`, so a byte change upstream (with the sync pipeline refreshing `md5`) invalidates the cache automatically.

**Bounded concurrency, durable queuing, and admission control.** Dispatch is bounded on three cooperating layers so an unauthenticated burst on `/data` and `/index` can't oversubscribe the worker fleet or queue unbounded work:

- **Per-worker backpressure** — each worker accepts at most `CFDB_WORKER_MAX_CONCURRENT_TASKS` tasks at once (default `1`), serializing the subprocess pipelines on a 1-vCPU worker. A worker at capacity rejects the dispatch and the API's priority load balancer rotates to the next worker.
- **Priority (leaky-bucket) load balancing** — the API offers each task to discovered workers in a stable order, so load concentrates on the lowest-ordered workers and over-provisioned workers drain to idle and self-reap (via `CFDB_WORKER_MAX_LIFETIME_SECONDS`) instead of every worker carrying a thin perpetual slice.
- **Durable queue + retry-to-deadline** — when no worker has capacity, the job is **not** failed and does **not** block the request: it stays `pending` and a durable, Mongo-backed scheduler re-attempts dispatch every `CFDB_WORKFLOW_RETRY_INTERVAL_S` (plus jitter) until a worker frees up or the `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` deadline elapses (then it is failed with a `capacity:`-prefixed error). Because the queue lives in Mongo, an API restart resumes it. On every scheduler tick (including the first, on boot) an orphan-recovery sweep re-queues jobs a crash left mid-flight — a `running` job whose API consumer died, or a fresh `pending` claim that never rescheduled — once they pass the stale threshold (`CFDB_WORKFLOW_STALE_THRESHOLD_S`), so recovery is autonomous and does not wait for a client to re-request the file. Recovery shares the same deadline clock as a fresh job: the re-queue preserves the original submission time, so an orphan older than `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` is failed `capacity:` on its first recovery attempt rather than resumed (its committed cache artifacts survive for a later fresh `GET` to reuse) — recovery is best-effort, not unbounded. On the ECS profile, an overflow also requests one bounded worker spawn (the leaky bucket overflowing), inverting the old unconditional per-request spawn.
- **Admission ceiling** — once `CFDB_WORKFLOW_MAX_ACTIVE` workflows are active (`pending` + `running`), further preprocessing requests are shed with `429 Retry-After` rather than queued, so the backlog itself is bounded. The check runs before the per-file mutex, so at the ceiling even a re-`GET` for a file whose workflow is already in flight is shed with `429` (rather than attaching to the in-flight job) and the client retries — the deliberate trade for shedding before an unbounded admission race. The readiness `/status` probes never dispatch and so never `429`.

`/index` continues to serve upstream sidecars first when present (the 218 BED→beddb and 4 BED→tbi 4DN cases that publish under `extra.extra_files` or `extra.fourdn.extra_files`); the workflow path is dispatched only when no sidecar exists. Set `?raw=true` to bypass the workflow path entirely and return only the upstream sidecar (404 when none exists).

Required environment variables:
Expand All @@ -659,10 +668,16 @@ Required environment variables:
- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), so the **maximum concurrent worker count is bounded by the AWS Fargate vCPU service quota** for the account/region — raise it in Service Quotas; at 1 vCPU per worker it maps roughly 1:1 to concurrent workers. A burst of N distinct uncached files can launch up to ~N workers concurrently, subject to that quota.
- `WORKFLOW_POOL_NAMESPACE` — wool discovery namespace shared by the API and the external worker pool (default `cfdb-workers`). Both processes must agree on this value or dispatch will hang waiting for workers.

Bounded-concurrency control (issue #45):

- `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process.
- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**.
- `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**.
- `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**.

Optional tunables (with defaults):

- `CFDB_WORKFLOW_DURATION_CAP_S` — per-workflow wall-clock cap (default `14400`, i.e. 4 h — sized for multi-hour preprocessing runs; lower it for fixture-bound dev).
- `CFDB_WORKFLOW_DISPATCH_WAIT_S` — how long `ensure_workflow` waits for a free worker before giving up (default `240`, i.e. 4 min — sized for an ECS Fargate cold start: image pull + health check typically take 1-3 min, so a smaller budget can expire before a freshly-provisioned worker reports HEALTHY and hard-fail the first request. Lower it for fixture-bound dev where workers are already running).
- `CFDB_WORKFLOW_HEARTBEAT_INTERVAL_S` — cadence at which the wool routine emits heartbeat events during quiet stages so the API can refresh `JobRecord.updated_at` (default `300`). The stale-reclaim threshold below is sized as `2 × heartbeat + safety_margin`; lowering this knob without also lowering the threshold widens the false-reclaim window.
- `CFDB_WORKFLOW_STALE_THRESHOLD_S` — `updated_at` age beyond which an active row is reclaimable (default `900`; sized as `2 × heartbeat_interval + safety_margin` so a single missed heartbeat does not falsely reclaim a healthy worker).
- `CFDB_SAMTOOLS_THREADS` (default `1`), `CFDB_SORT_PARALLEL` (default `2`) — CPU/thread caps for `samtools sort/index` and GNU `sort` respectively.
Expand Down
11 changes: 11 additions & 0 deletions scripts/create-indexes.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ ensureIndex(
{ name: "status_updated_at" }
);

// Serves the durable retry scheduler's due-dispatch lease
// (workflows.lock.lease_due_dispatch): { status: "pending",
// next_dispatch_at: { $lte: now } } sorted by next_dispatch_at asc. The
// status equality prefix plus the next_dispatch_at range/sort are both
// index-served, so the per-tick poll is not a scan of all PENDING rows.
ensureIndex(
db.jobs,
{ status: 1, next_dispatch_at: 1 },
{ name: "status_next_dispatch_at" }
);

// TTL index on terminal job rows. Without this, every stale-reclaim
// transition leaves a permanent ``failed`` document and the collection
// grows unbounded for any frequently-touched workflow_key. The partial
Expand Down
28 changes: 22 additions & 6 deletions src/cfdb/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cfdb.workflows.credentials import build_worker_credentials
from cfdb.workflows.discovery import EcsDiscovery
from cfdb.workflows.executor import WoolExecutor
from cfdb.workflows.loadbalancer import PriorityLoadBalancer
from cfdb.workflows.processors.bam import BamIndexProcessor
from cfdb.workflows.processors.registry import default_registry
from cfdb.workflows.processors.tabix import TabixIntervalProcessor
Expand Down Expand Up @@ -329,19 +330,27 @@ async def lifespan(_: FastAPI):
# dispatch — fired while the Fargate worker is still booting
# (image pull + health check, ~1-3 min) — blow the quorum
# wait and raise an un-retried ``TimeoutError`` out of
# ``WorkerPool.start``. cfdb's ``_open_stream_with_retry``
# only retries ``wool.NoWorkersAvailable``, so that
# ``WorkerPool.start``. cfdb's dispatch attempt only treats
# ``wool.NoWorkersAvailable`` as "no capacity", so that
# ``TimeoutError`` fails the first request hard (and the
# provisioner's cancelled ``RunTask`` leaks an idle worker).
# With the gate off, a cold-start dispatch surfaces
# ``NoWorkersAvailable`` instead, which the executor already
# absorbs within its own ``_DISPATCH_WAIT_SECONDS`` budget —
# keeping the cold-start wait in one layer cfdb owns rather
# than duplicating it across wool's quorum wait too.
# ``NoWorkersAvailable`` instead, which the executor absorbs
# by leaving the job PENDING and re-attempting it on the
# durable retry scheduler's cadence — keeping the cold-start
# wait in one layer cfdb owns rather than duplicating it
# across wool's quorum wait too.
async with wool.WorkerPool(
discovery=discovery,
credentials=worker_credentials,
quorum=0,
# Priority/leaky-bucket balancing: always offer a task to
# discovered workers in the same stable order. With
# per-worker backpressure (one task each), load fills the
# lowest-ordered workers first and higher-ordered ones
# drain to idle and self-reap, instead of every worker
# carrying a thin perpetual slice (issue #45).
loadbalancer=PriorityLoadBalancer(),
):
# Snapshot the lifespan task's contextvars after the
# pool's ``__aenter__`` has populated wool's internals.
Expand All @@ -354,6 +363,13 @@ async def lifespan(_: FastAPI):
provisioner=provisioner,
)
executor_handle = api.executor
# Start the durable retry scheduler — the single dispatch
# driver — here, inside the pool context, so the created
# task inherits wool's dispatch contextvars (same as
# request-spawned tasks via ``attach_wool_context``).
# ``_drain_executor`` cancels it on teardown before the
# pool closes.
api.executor.start_scheduler()
log.info(
"Workflow subsystem enabled: profile=%s cache=%s "
"workdir=%s discovery=%s provisioner=%s "
Expand Down
18 changes: 17 additions & 1 deletion src/cfdb/api/routers/cache_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from cfdb import api
from cfdb.services import drs
from cfdb.workflows.cache import CacheBackend
from cfdb.workflows.executor import ExecutorDraining, WorkflowNotApplicable
from cfdb.workflows.executor import (
AdmissionRejected,
ExecutorDraining,
WorkflowNotApplicable,
)
from cfdb.workflows.models import ArtifactKind

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -275,6 +279,18 @@ async def serve_workflow_artifact_or_dispatch(
detail="Service is shutting down; please retry",
headers={"Retry-After": "30"},
)
except AdmissionRejected as exc:
# The active-workflow ceiling is hit — shed this request with a
# 429 so the client backs off rather than queuing unbounded work.
# NOT a WorkflowNotApplicable subclass, so it must be caught
# explicitly (before that handler) to avoid falling through to
# direct upstream streaming, which would bypass the bounded
# pipeline entirely.
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many active preprocessing jobs; please retry shortly",
headers={"Retry-After": str(exc.retry_after_seconds)},
)
except WorkflowNotApplicable:
# Race: processor accepted this file, but executor disagreed.
# Fall through to the caller's own path rather than 500.
Expand Down
10 changes: 10 additions & 0 deletions src/cfdb/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ def operational_index_specs() -> list[IndexSpec]:
),
IndexSpec("jobs", [("job_id", 1)], name="job_id_unique", unique=True),
IndexSpec("jobs", [("status", 1), ("updated_at", 1)], name="status_updated_at"),
# Serves the durable retry scheduler's due-dispatch lease
# (workflows.lock.lease_due_dispatch): {status: pending,
# next_dispatch_at: {$lte: now}} sorted by next_dispatch_at asc.
# The status equality prefix + next_dispatch_at range/sort are
# both index-served, so the per-tick poll is not a PENDING scan.
IndexSpec(
"jobs",
[("status", 1), ("next_dispatch_at", 1)],
name="status_next_dispatch_at",
),
# TTL on terminal rows so the collection doesn't grow unbounded.
# The partial filter excludes active rows (active=True) so an
# in-flight job is never reaped. 7 days gives operators a window
Expand Down
Loading
Loading