From 1136553b7cdf71b0c84b9fe0abda05a1fe37e688 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 23 Jun 2026 13:41:15 -0400 Subject: [PATCH] feat: Cap the ephemeral worker fleet at the ECS provisioner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bound the number of concurrent ephemeral Fargate worker tasks without limiting the queue. Before each RunTask the EcsProvisioner counts running/starting worker tasks via list_tasks and skips the spawn when already at ECS_MAX_WORKERS (default 16); the job stays pending and the durable scheduler dispatches it when an existing worker frees, so the worker fleet is bounded while the queue is not. This is deliberately distinct from CFDB_WORKFLOW_MAX_ACTIVE, which bounds queue-plus-running (admission) and sheds with 429 — the worker cap never sheds. The knob flows env (ECS_MAX_WORKERS) -> _EcsConfig -> EcsProvisioner and is exposed as the backend stack's EcsMaxWorkers parameter. 0 disables the cap (rely on the Fargate vCPU quota). Soft cap: list_tasks eventual-consistency lag plus a count-then-spawn race across distinct workflow keys can briefly overshoot. A list_tasks failure raises a retryable error so the job queues and retries rather than spawning blind past the cap. --- README.md | 8 +- cloudformation/backend.yml | 16 ++ src/cfdb/api/__init__.py | 11 ++ src/cfdb/api/main.py | 2 + src/cfdb/api/profile.py | 2 + src/cfdb/workflows/provisioner.py | 67 +++++++++ tests/test_api/test_lifespan_workerpool.py | 1 + tests/test_workflows/test_provisioner.py | 164 ++++++++++++++++++++- 8 files changed, 267 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f7e4ce8..0b31e05 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,8 @@ Requires Python 3.11 or later. |----------|-------------|---------| | `SYNC_API_KEY` | API key for the sync endpoint. If unset, sync is unprotected (suitable for local dev). | - | | `SYNC_DATA_DIR` | Root directory for downloaded sync data, the workflow cache (`$SYNC_DATA_DIR/cache`), and per-job workdirs (`$SYNC_DATA_DIR/jobs`). When unset the preprocessing/indexing workflow subsystem is disabled: `/data` falls through to direct upstream streaming, `/index` returns 503 for processable formats (BAM/VCF/etc.) and 404 for passthrough formats (CSV/TSV/bigWig). Both subdirectories must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity. | - | -| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker, so in the ECS profile concurrency is bounded by the AWS Fargate vCPU service quota. | `2` | +| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker. In the ECS profile the concurrent worker fleet is bounded by `ECS_MAX_WORKERS`, with the AWS Fargate vCPU service quota as the hard ceiling. | `2` | +| `ECS_MAX_WORKERS` | ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks. The provisioner counts the fleet before each `RunTask` and queues rather than spawning past this, so it bounds workers without limiting the queue (that is `CFDB_WORKFLOW_MAX_ACTIVE`). `0` disables the cap. | `16` | | `WORKFLOW_POOL_NAMESPACE` | wool LAN discovery namespace shared by the API and the worker-pool process. Both processes MUST set the same value or dispatch will hang on `NoWorkersAvailable`. | `cfdb-workers` | | `CFDB_WORKER_TLS_CA` | Path to the shared CA certificate for the wool worker gRPC channel. When this and the cert/key below are all set, the API↔worker dispatch channel uses mutual TLS (`mutual=True`); when all three are unset the channel stays plaintext. Partial config fails fast at startup. The API and every worker MUST use certs signed by the same CA. See [Worker mTLS](#worker-mtls). | - | | `CFDB_WORKER_TLS_CERT` | Path to this process's PEM certificate on the worker gRPC channel — the worker leaf cert on a worker (`worker_main`/`worker_lan`), the API client cert on the API. Must be signed by `CFDB_WORKER_TLS_CA`. | - | @@ -665,13 +666,14 @@ Cache keys are content-addressed using each file's upstream `md5`, so a byte cha Required environment variables: - `SYNC_DATA_DIR` — directory under which the workflow cache and per-job workdirs live. Both subdirectories (`$SYNC_DATA_DIR/cache` and `$SYNC_DATA_DIR/jobs`) must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity; the API asserts this at startup and fails fast if they live on different volumes. When unset, the workflow subsystem is disabled, `/data` falls through to direct upstream streaming, `/index` returns 404 for passthrough formats (CSV/TSV/bigWig — there is no index in any state of the world), and `/index` returns 503 for processable formats that would otherwise dispatch a workflow (sidecar-served files still work). -- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), so the **maximum concurrent worker count is bounded by the AWS Fargate vCPU service quota** for the account/region — raise it in Service Quotas; at 1 vCPU per worker it maps roughly 1:1 to concurrent workers. A burst of N distinct uncached files can launch up to ~N workers concurrently, subject to that quota. +- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), and the concurrent worker fleet is bounded by `ECS_MAX_WORKERS` (default `16`; see the bounded-concurrency knobs below) with the **AWS Fargate vCPU service quota** as the hard ceiling above it. A burst of N distinct uncached files launches up to `min(N, ECS_MAX_WORKERS)` workers; the rest queue. - `WORKFLOW_POOL_NAMESPACE` — wool discovery namespace shared by the API and the external worker pool (default `cfdb-workers`). Both processes must agree on this value or dispatch will hang waiting for workers. Bounded-concurrency control (issue #45): - `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process. -- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**. +- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs combined — i.e. the **queue depth plus running**, not the worker count). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). To cap the worker fleet without limiting the queue, use `ECS_MAX_WORKERS` instead. Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**. +- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts running/starting worker tasks (`list_tasks`) and skips the spawn when at the cap, so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up — it does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Soft cap — `list_tasks` eventual-consistency lag plus a count-then-spawn race can briefly overshoot. Set on the **API**. - `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**. - `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**. diff --git a/cloudformation/backend.yml b/cloudformation/backend.yml index d4d479f..06c1e70 100644 --- a/cloudformation/backend.yml +++ b/cloudformation/backend.yml @@ -35,6 +35,17 @@ Parameters: Type: Number Default: 1 Description: Number of ECS tasks + EcsMaxWorkers: + Type: Number + Default: 16 + Description: > + Cap on concurrently-running ephemeral Fargate worker tasks, set as + ECS_MAX_WORKERS. Before each RunTask the provisioner counts running + worker tasks and skips the spawn when at this cap, so the worker fleet + is bounded while excess jobs stay queued (the durable scheduler + dispatches them as workers free up). This bounds WORKERS, not the + queue — the queue/admission bound is CFDB_WORKFLOW_MAX_ACTIVE. 0 + disables the cap (rely on the Fargate vCPU quota). HostedZoneName: Type: String Default: vis-api.link @@ -237,6 +248,11 @@ Resources: Value: !Ref WorkflowS3Prefix - Name: AWS_REGION Value: !Ref AWS::Region + # Cap on concurrently-running ephemeral worker tasks: the + # provisioner counts the fleet before each RunTask and queues + # rather than spawning past this. Bounds workers, not the queue. + - Name: ECS_MAX_WORKERS + Value: !Ref EcsMaxWorkers # The cert PEMs (when mTLS is enabled) are injected as env vars # and written to files by the image entrypoint # (cfdb-tls-entrypoint.sh), which points CFDB_WORKER_TLS_CA/CERT/KEY diff --git a/src/cfdb/api/__init__.py b/src/cfdb/api/__init__.py index b315b3c..a392fee 100644 --- a/src/cfdb/api/__init__.py +++ b/src/cfdb/api/__init__.py @@ -223,6 +223,17 @@ def _parse_assign_public_ip(name: str, default: str) -> str: "ECS_WORKER_ASSIGN_PUBLIC_IP", "DISABLED" ) +#: Cap on concurrently-running ephemeral worker tasks on the ECS profile. +#: Before each RunTask the provisioner counts running/starting worker tasks +#: and skips the spawn when already at this cap, so the worker fleet is +#: bounded while excess jobs stay queued (the durable scheduler dispatches +#: them as workers free up — no shedding; the queue is bounded separately by +#: CFDB_WORKFLOW_MAX_ACTIVE). ``0`` disables the cap (rely on the Fargate +#: vCPU quota). Default 16 so an unconfigured ECS deployment fails safe to a +#: small fleet rather than the account quota. Applies only to the ECS +#: profile (no provisioner exists in the local/LAN profile). +ECS_MAX_WORKERS: Final = _parse_int_env("ECS_MAX_WORKERS", 16, minimum=0) + db: AsyncIOMotorDatabase | None = None cache: "CacheBackend | None" = None executor: "JobExecutor | None" = None diff --git a/src/cfdb/api/main.py b/src/cfdb/api/main.py index 53db36d..7b6831b 100644 --- a/src/cfdb/api/main.py +++ b/src/cfdb/api/main.py @@ -180,9 +180,11 @@ def _build_provisioner(profile: WorkflowProfile) -> Optional[EcsProvisioner]: return EcsProvisioner( cluster=profile.ecs.cluster, task_definition=profile.ecs.task_definition, + task_family=profile.ecs.task_family, subnets=profile.ecs.subnets, security_groups=profile.ecs.security_groups, assign_public_ip=profile.ecs.assign_public_ip, + max_workers=profile.ecs.max_workers, endpoint_url=profile.aws_endpoint_url, region_name=profile.aws_region, ) diff --git a/src/cfdb/api/profile.py b/src/cfdb/api/profile.py index 91e2846..9d9aed4 100644 --- a/src/cfdb/api/profile.py +++ b/src/cfdb/api/profile.py @@ -51,6 +51,7 @@ class _EcsConfig: subnets: tuple[str, ...] security_groups: tuple[str, ...] assign_public_ip: AssignPublicIp + max_workers: int @dataclass(frozen=True) @@ -152,4 +153,5 @@ def _ecs_config_from_env() -> Optional[_EcsConfig]: subnets=tuple(api.ECS_WORKER_SUBNETS), security_groups=tuple(api.ECS_WORKER_SECURITY_GROUPS), assign_public_ip=api.ECS_WORKER_ASSIGN_PUBLIC_IP, + max_workers=api.ECS_MAX_WORKERS, ) diff --git a/src/cfdb/workflows/provisioner.py b/src/cfdb/workflows/provisioner.py index 0a65b57..79b67e6 100644 --- a/src/cfdb/workflows/provisioner.py +++ b/src/cfdb/workflows/provisioner.py @@ -93,6 +93,18 @@ class EcsProvisioner: max_in_flight: Soft cap on concurrent ``RunTask`` calls. ECS's ``RunTask`` API is rate-limited to ~20 req/s per account; this guard keeps us well under it. + task_family: Task-definition family used to count the current + worker fleet via ``list_tasks`` for the ``max_workers`` cap. + Defaults to ``task_definition`` with any ``:revision`` suffix + stripped. + max_workers: Cap on concurrently-running worker tasks. Before each + ``RunTask`` the provisioner counts running/starting worker tasks + and skips the spawn when already at this cap, so the worker + fleet is bounded while excess jobs stay queued (the durable + scheduler dispatches them as workers free up — no shedding). + ``0`` disables the cap (rely on the Fargate vCPU quota). Soft: + ``list_tasks`` eventual-consistency lag plus a count-then-spawn + race across distinct workflow keys can briefly overshoot. """ def __init__( @@ -107,11 +119,15 @@ def __init__( endpoint_url: Optional[str] = None, region_name: Optional[str] = None, max_in_flight: int = 16, + task_family: Optional[str] = None, + max_workers: int = 0, ) -> None: if not cluster: raise ValueError("EcsProvisioner requires a cluster name") if not task_definition: raise ValueError("EcsProvisioner requires a task_definition") + if max_workers < 0: + raise ValueError(f"max_workers must be >= 0; got {max_workers}") subnet_list = list(subnets) if not subnet_list: raise ValueError("EcsProvisioner requires at least one subnet") @@ -128,6 +144,10 @@ def __init__( self._cluster = cluster self._task_definition = task_definition + # Family used to count the running fleet via ``list_tasks``; strip + # any ``:revision`` so a pinned task-def revision still matches. + self._task_family = task_family or task_definition.split(":", 1)[0] + self._max_workers = max_workers self._subnets = subnet_list self._security_groups = list(security_groups) self._assign_public_ip = assign_public_ip @@ -239,11 +259,58 @@ async def _release_dedup_slot() -> None: self._in_flight.pop(dedup_key, None) try: + # Worker-fleet cap: if the fleet is already at the cap, do not + # launch another task. Return an empty ARN list rather than + # raise — the caller (``_handle_overflow``) reschedules the job, + # so it stays queued and runs when an existing worker frees up. + # This bounds the worker-container count while preserving the + # queue (the admission ceiling, not this cap, bounds the queue). + if self._max_workers > 0: + running = await self._current_worker_count() + if running >= self._max_workers: + logger.info( + "Worker fleet at capacity (%d/%d); not spawning for " + "%s — job stays queued until a worker frees", + running, + self._max_workers, + dedup_key, + ) + return [] async with self._semaphore: return await self._run_task() finally: await asyncio.shield(_release_dedup_slot()) + async def _current_worker_count(self) -> int: + """Count worker tasks whose desired status is RUNNING. + + ``list_tasks`` with ``desiredStatus="RUNNING"`` returns tasks still + starting (PROVISIONING / PENDING / ACTIVATING) as well as those + already running, so the count reflects the workers that exist or are + coming up — the right denominator for the fleet cap. A ``list_tasks`` + failure raises :class:`RetryableProvisionerError` so the caller + queues the job and retries on the next tick rather than spawning + blind past the cap. + + Read-only and cheap, so it runs on the default executor rather than + the owned RunTask pool — ``aclose``'s deterministic drain only needs + to cover the billable RunTask launches. The cap is small (well under + the first ``list_tasks`` page of 100), so the first page is the whole + fleet and pagination is unnecessary to decide whether it is reached. + """ + try: + response = await asyncio.to_thread( + self._client.list_tasks, + cluster=self._cluster, + family=self._task_family, + desiredStatus="RUNNING", + ) + except (ClientError, BotoCoreError) as exc: + raise RetryableProvisionerError( + f"list_tasks failed: {type(exc).__name__}: {exc}" + ) from exc + return len(response.get("taskArns") or []) + async def _run_task(self) -> list[str]: """Single ``RunTask`` invocation translated to a list of task ARNs. diff --git a/tests/test_api/test_lifespan_workerpool.py b/tests/test_api/test_lifespan_workerpool.py index b4681fd..86c40f1 100644 --- a/tests/test_api/test_lifespan_workerpool.py +++ b/tests/test_api/test_lifespan_workerpool.py @@ -166,6 +166,7 @@ async def test_build_discovery_should_yield_ecs_discovery_un_entered( subnets=("subnet-1",), security_groups=(), assign_public_ip="DISABLED", + max_workers=16, ) profile = profile_mod.WorkflowProfile( kind="ecs", diff --git a/tests/test_workflows/test_provisioner.py b/tests/test_workflows/test_provisioner.py index 6b85272..4cbf746 100644 --- a/tests/test_workflows/test_provisioner.py +++ b/tests/test_workflows/test_provisioner.py @@ -12,13 +12,23 @@ class _FakeEcsClient: """In-memory ECS client recording RunTask calls for assertions.""" - def __init__(self, *, response: dict | None = None, raise_on_call: Exception | None = None) -> None: + def __init__( + self, + *, + response: dict | None = None, + raise_on_call: Exception | None = None, + list_tasks_arns: list[str] | None = None, + list_tasks_raise: Exception | None = None, + ) -> None: self.calls: list[dict] = [] + self.list_tasks_calls: list[dict] = [] self._response = response or { "tasks": [{"taskArn": "arn:aws:ecs:::task/cluster/abc"}], "failures": [], } self._raise = raise_on_call + self._list_tasks_arns = list(list_tasks_arns or []) + self._list_tasks_raise = list_tasks_raise def run_task(self, **kwargs): self.calls.append(kwargs) @@ -26,6 +36,12 @@ def run_task(self, **kwargs): raise self._raise return self._response + def list_tasks(self, **kwargs): + self.list_tasks_calls.append(kwargs) + if self._list_tasks_raise is not None: + raise self._list_tasks_raise + return {"taskArns": list(self._list_tasks_arns)} + class _SimpleGatedClient(_FakeEcsClient): """run_task blocks on a threading.Event until released by the test.""" @@ -398,6 +414,152 @@ async def _resolved(value): return value +class TestEcsProvisionerWorkerCap: + def test___init___rejects_negative_max_workers(self): + """Test that a negative max_workers is rejected at construction. + + Given: + A negative ``max_workers`` value. + When: + EcsProvisioner is constructed. + Then: + It should raise ValueError so the misconfiguration is caught at + boot rather than silently disabling the cap. + """ + # Act & assert + with pytest.raises(ValueError, match="max_workers"): + EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=_FakeEcsClient(), + max_workers=-1, + ) + + @pytest.mark.asyncio + async def test_request_should_skip_spawn_when_fleet_at_capacity(self): + """Test that request does not launch a task when the fleet is at the cap. + + Given: + A provisioner with ``max_workers=2`` whose ``list_tasks`` reports + two running worker tasks. + When: + request is awaited. + Then: + It should count the fleet, skip RunTask entirely, and return an + empty ARN list so the caller queues the job instead of spawning a + third worker. + """ + # Arrange + client = _FakeEcsClient( + list_tasks_arns=["arn:task/w1", "arn:task/w2"], + ) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker:7", + subnets=["subnet-1"], + client=client, + max_workers=2, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == [] + assert client.calls == [] # RunTask never invoked + assert len(client.list_tasks_calls) == 1 + # The :revision suffix is stripped for the family filter. + assert client.list_tasks_calls[0]["family"] == "worker" + assert client.list_tasks_calls[0]["desiredStatus"] == "RUNNING" + + @pytest.mark.asyncio + async def test_request_should_spawn_when_below_capacity(self): + """Test that request launches a task when the fleet is below the cap. + + Given: + A provisioner with ``max_workers=3`` whose ``list_tasks`` reports + one running worker task. + When: + request is awaited. + Then: + It should launch one worker via RunTask and return its ARN. + """ + # Arrange + client = _FakeEcsClient(list_tasks_arns=["arn:task/w1"]) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + max_workers=3, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == ["arn:aws:ecs:::task/cluster/abc"] + assert len(client.calls) == 1 + assert len(client.list_tasks_calls) == 1 + + @pytest.mark.asyncio + async def test_request_should_not_count_fleet_when_cap_disabled(self): + """Test that max_workers=0 disables the fleet count entirely. + + Given: + A provisioner with the default ``max_workers=0`` (cap disabled). + When: + request is awaited. + Then: + It should launch a worker without ever calling ``list_tasks``, so + an unset cap incurs no extra ECS round-trip and never throttles. + """ + # Arrange + client = _FakeEcsClient(list_tasks_arns=["arn:task/w1", "arn:task/w2"]) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == ["arn:aws:ecs:::task/cluster/abc"] + assert client.list_tasks_calls == [] + + @pytest.mark.asyncio + async def test_request_should_raise_retryable_when_list_tasks_fails(self): + """Test that a list_tasks failure surfaces as a retryable error. + + Given: + A provisioner with a cap whose ``list_tasks`` raises a + botocore ClientError. + When: + request is awaited. + Then: + It should raise RetryableProvisionerError (so the caller queues + and retries) and never launch a task blind past the cap. + """ + # Arrange + client = _FakeEcsClient(list_tasks_raise=_client_error("ThrottlingException")) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + max_workers=2, + ) + + # Act & assert + with pytest.raises(RetryableProvisionerError, match="list_tasks failed"): + await provisioner.request(dedup_key="wf-1") + assert client.calls == [] + + # --------------------------------------------------------------------------- # Moto-backed wire-shape verification. #