Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Requires Python 3.11 or later.
|----------|-------------|---------|
| `SYNC_API_KEY` | API key for the sync endpoint. If unset, sync is unprotected (suitable for local dev). | - |
| `SYNC_DATA_DIR` | Root directory for downloaded sync data, the workflow cache (`$SYNC_DATA_DIR/cache`), and per-job workdirs (`$SYNC_DATA_DIR/jobs`). When unset the preprocessing/indexing workflow subsystem is disabled: `/data` falls through to direct upstream streaming, `/index` returns 503 for processable formats (BAM/VCF/etc.) and 404 for passthrough formats (CSV/TSV/bigWig). Both subdirectories must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity. | - |
| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker, so in the ECS profile concurrency is bounded by the AWS Fargate vCPU service quota. | `2` |
| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker. In the ECS profile the concurrent worker fleet is bounded by `ECS_MAX_WORKERS`, with the AWS Fargate vCPU service quota as the hard ceiling. | `2` |
| `ECS_MAX_WORKERS` | ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks. The provisioner counts the fleet before each `RunTask` and queues rather than spawning past this, so it bounds workers without limiting the queue (that is `CFDB_WORKFLOW_MAX_ACTIVE`). `0` disables the cap. | `16` |
| `WORKFLOW_POOL_NAMESPACE` | wool LAN discovery namespace shared by the API and the worker-pool process. Both processes MUST set the same value or dispatch will hang on `NoWorkersAvailable`. | `cfdb-workers` |
| `CFDB_WORKER_TLS_CA` | Path to the shared CA certificate for the wool worker gRPC channel. When this and the cert/key below are all set, the API↔worker dispatch channel uses mutual TLS (`mutual=True`); when all three are unset the channel stays plaintext. Partial config fails fast at startup. The API and every worker MUST use certs signed by the same CA. See [Worker mTLS](#worker-mtls). | - |
| `CFDB_WORKER_TLS_CERT` | Path to this process's PEM certificate on the worker gRPC channel — the worker leaf cert on a worker (`worker_main`/`worker_lan`), the API client cert on the API. Must be signed by `CFDB_WORKER_TLS_CA`. | - |
Expand Down Expand Up @@ -665,13 +666,14 @@ Cache keys are content-addressed using each file's upstream `md5`, so a byte cha
Required environment variables:

- `SYNC_DATA_DIR` — directory under which the workflow cache and per-job workdirs live. Both subdirectories (`$SYNC_DATA_DIR/cache` and `$SYNC_DATA_DIR/jobs`) must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity; the API asserts this at startup and fails fast if they live on different volumes. When unset, the workflow subsystem is disabled, `/data` falls through to direct upstream streaming, `/index` returns 404 for passthrough formats (CSV/TSV/bigWig — there is no index in any state of the world), and `/index` returns 503 for processable formats that would otherwise dispatch a workflow (sidecar-served files still work).
- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), so the **maximum concurrent worker count is bounded by the AWS Fargate vCPU service quota** for the account/region — raise it in Service Quotas; at 1 vCPU per worker it maps roughly 1:1 to concurrent workers. A burst of N distinct uncached files can launch up to ~N workers concurrently, subject to that quota.
- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), and the concurrent worker fleet is bounded by `ECS_MAX_WORKERS` (default `16`; see the bounded-concurrency knobs below) with the **AWS Fargate vCPU service quota** as the hard ceiling above it. A burst of N distinct uncached files launches up to `min(N, ECS_MAX_WORKERS)` workers; the rest queue.
- `WORKFLOW_POOL_NAMESPACE` — wool discovery namespace shared by the API and the external worker pool (default `cfdb-workers`). Both processes must agree on this value or dispatch will hang waiting for workers.

Bounded-concurrency control (issue #45):

- `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process.
- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**.
- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs combined — i.e. the **queue depth plus running**, not the worker count). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). To cap the worker fleet without limiting the queue, use `ECS_MAX_WORKERS` instead. Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**.
- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts running/starting worker tasks (`list_tasks`) and skips the spawn when at the cap, so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up — it does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Soft cap — `list_tasks` eventual-consistency lag plus a count-then-spawn race can briefly overshoot. Set on the **API**.
- `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**.
- `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**.

Expand Down
16 changes: 16 additions & 0 deletions cloudformation/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ Parameters:
Type: Number
Default: 1
Description: Number of ECS tasks
EcsMaxWorkers:
Type: Number
Default: 16
Description: >
Cap on concurrently-running ephemeral Fargate worker tasks, set as
ECS_MAX_WORKERS. Before each RunTask the provisioner counts running
worker tasks and skips the spawn when at this cap, so the worker fleet
is bounded while excess jobs stay queued (the durable scheduler
dispatches them as workers free up). This bounds WORKERS, not the
queue — the queue/admission bound is CFDB_WORKFLOW_MAX_ACTIVE. 0
disables the cap (rely on the Fargate vCPU quota).
HostedZoneName:
Type: String
Default: vis-api.link
Expand Down Expand Up @@ -237,6 +248,11 @@ Resources:
Value: !Ref WorkflowS3Prefix
- Name: AWS_REGION
Value: !Ref AWS::Region
# Cap on concurrently-running ephemeral worker tasks: the
# provisioner counts the fleet before each RunTask and queues
# rather than spawning past this. Bounds workers, not the queue.
- Name: ECS_MAX_WORKERS
Value: !Ref EcsMaxWorkers
# The cert PEMs (when mTLS is enabled) are injected as env vars
# and written to files by the image entrypoint
# (cfdb-tls-entrypoint.sh), which points CFDB_WORKER_TLS_CA/CERT/KEY
Expand Down
11 changes: 11 additions & 0 deletions src/cfdb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ def _parse_assign_public_ip(name: str, default: str) -> str:
"ECS_WORKER_ASSIGN_PUBLIC_IP", "DISABLED"
)

#: Cap on concurrently-running ephemeral worker tasks on the ECS profile.
#: Before each RunTask the provisioner counts running/starting worker tasks
#: and skips the spawn when already at this cap, so the worker fleet is
#: bounded while excess jobs stay queued (the durable scheduler dispatches
#: them as workers free up — no shedding; the queue is bounded separately by
#: CFDB_WORKFLOW_MAX_ACTIVE). ``0`` disables the cap (rely on the Fargate
#: vCPU quota). Default 16 so an unconfigured ECS deployment fails safe to a
#: small fleet rather than the account quota. Applies only to the ECS
#: profile (no provisioner exists in the local/LAN profile).
ECS_MAX_WORKERS: Final = _parse_int_env("ECS_MAX_WORKERS", 16, minimum=0)

db: AsyncIOMotorDatabase | None = None
cache: "CacheBackend | None" = None
executor: "JobExecutor | None" = None
Expand Down
2 changes: 2 additions & 0 deletions src/cfdb/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ def _build_provisioner(profile: WorkflowProfile) -> Optional[EcsProvisioner]:
return EcsProvisioner(
cluster=profile.ecs.cluster,
task_definition=profile.ecs.task_definition,
task_family=profile.ecs.task_family,
subnets=profile.ecs.subnets,
security_groups=profile.ecs.security_groups,
assign_public_ip=profile.ecs.assign_public_ip,
max_workers=profile.ecs.max_workers,
endpoint_url=profile.aws_endpoint_url,
region_name=profile.aws_region,
)
Expand Down
2 changes: 2 additions & 0 deletions src/cfdb/api/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class _EcsConfig:
subnets: tuple[str, ...]
security_groups: tuple[str, ...]
assign_public_ip: AssignPublicIp
max_workers: int


@dataclass(frozen=True)
Expand Down Expand Up @@ -152,4 +153,5 @@ def _ecs_config_from_env() -> Optional[_EcsConfig]:
subnets=tuple(api.ECS_WORKER_SUBNETS),
security_groups=tuple(api.ECS_WORKER_SECURITY_GROUPS),
assign_public_ip=api.ECS_WORKER_ASSIGN_PUBLIC_IP,
max_workers=api.ECS_MAX_WORKERS,
)
67 changes: 67 additions & 0 deletions src/cfdb/workflows/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ class EcsProvisioner:
max_in_flight: Soft cap on concurrent ``RunTask`` calls. ECS's
``RunTask`` API is rate-limited to ~20 req/s per account;
this guard keeps us well under it.
task_family: Task-definition family used to count the current
worker fleet via ``list_tasks`` for the ``max_workers`` cap.
Defaults to ``task_definition`` with any ``:revision`` suffix
stripped.
max_workers: Cap on concurrently-running worker tasks. Before each
``RunTask`` the provisioner counts running/starting worker tasks
and skips the spawn when already at this cap, so the worker
fleet is bounded while excess jobs stay queued (the durable
scheduler dispatches them as workers free up — no shedding).
``0`` disables the cap (rely on the Fargate vCPU quota). Soft:
``list_tasks`` eventual-consistency lag plus a count-then-spawn
race across distinct workflow keys can briefly overshoot.
"""

def __init__(
Expand All @@ -107,11 +119,15 @@ def __init__(
endpoint_url: Optional[str] = None,
region_name: Optional[str] = None,
max_in_flight: int = 16,
task_family: Optional[str] = None,
max_workers: int = 0,
) -> None:
if not cluster:
raise ValueError("EcsProvisioner requires a cluster name")
if not task_definition:
raise ValueError("EcsProvisioner requires a task_definition")
if max_workers < 0:
raise ValueError(f"max_workers must be >= 0; got {max_workers}")
subnet_list = list(subnets)
if not subnet_list:
raise ValueError("EcsProvisioner requires at least one subnet")
Expand All @@ -128,6 +144,10 @@ def __init__(

self._cluster = cluster
self._task_definition = task_definition
# Family used to count the running fleet via ``list_tasks``; strip
# any ``:revision`` so a pinned task-def revision still matches.
self._task_family = task_family or task_definition.split(":", 1)[0]
self._max_workers = max_workers
self._subnets = subnet_list
self._security_groups = list(security_groups)
self._assign_public_ip = assign_public_ip
Expand Down Expand Up @@ -239,11 +259,58 @@ async def _release_dedup_slot() -> None:
self._in_flight.pop(dedup_key, None)

try:
# Worker-fleet cap: if the fleet is already at the cap, do not
# launch another task. Return an empty ARN list rather than
# raise — the caller (``_handle_overflow``) reschedules the job,
# so it stays queued and runs when an existing worker frees up.
# This bounds the worker-container count while preserving the
# queue (the admission ceiling, not this cap, bounds the queue).
if self._max_workers > 0:
running = await self._current_worker_count()
if running >= self._max_workers:
logger.info(
"Worker fleet at capacity (%d/%d); not spawning for "
"%s — job stays queued until a worker frees",
running,
self._max_workers,
dedup_key,
)
return []
async with self._semaphore:
return await self._run_task()
finally:
await asyncio.shield(_release_dedup_slot())

async def _current_worker_count(self) -> int:
"""Count worker tasks whose desired status is RUNNING.

``list_tasks`` with ``desiredStatus="RUNNING"`` returns tasks still
starting (PROVISIONING / PENDING / ACTIVATING) as well as those
already running, so the count reflects the workers that exist or are
coming up — the right denominator for the fleet cap. A ``list_tasks``
failure raises :class:`RetryableProvisionerError` so the caller
queues the job and retries on the next tick rather than spawning
blind past the cap.

Read-only and cheap, so it runs on the default executor rather than
the owned RunTask pool — ``aclose``'s deterministic drain only needs
to cover the billable RunTask launches. The cap is small (well under
the first ``list_tasks`` page of 100), so the first page is the whole
fleet and pagination is unnecessary to decide whether it is reached.
"""
try:
response = await asyncio.to_thread(
self._client.list_tasks,
cluster=self._cluster,
family=self._task_family,
desiredStatus="RUNNING",
)
except (ClientError, BotoCoreError) as exc:
raise RetryableProvisionerError(
f"list_tasks failed: {type(exc).__name__}: {exc}"
) from exc
return len(response.get("taskArns") or [])

async def _run_task(self) -> list[str]:
"""Single ``RunTask`` invocation translated to a list of task ARNs.

Expand Down
1 change: 1 addition & 0 deletions tests/test_api/test_lifespan_workerpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ async def test_build_discovery_should_yield_ecs_discovery_un_entered(
subnets=("subnet-1",),
security_groups=(),
assign_public_ip="DISABLED",
max_workers=16,
)
profile = profile_mod.WorkflowProfile(
kind="ecs",
Expand Down
Loading
Loading