diff --git a/README.md b/README.md index f7e4ce8..0b31e05 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,8 @@ Requires Python 3.11 or later. |----------|-------------|---------| | `SYNC_API_KEY` | API key for the sync endpoint. If unset, sync is unprotected (suitable for local dev). | - | | `SYNC_DATA_DIR` | Root directory for downloaded sync data, the workflow cache (`$SYNC_DATA_DIR/cache`), and per-job workdirs (`$SYNC_DATA_DIR/jobs`). When unset the preprocessing/indexing workflow subsystem is disabled: `/data` falls through to direct upstream streaming, `/index` returns 503 for processable formats (BAM/VCF/etc.) and 404 for passthrough formats (CSV/TSV/bigWig). Both subdirectories must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity. | - | -| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker, so in the ECS profile concurrency is bounded by the AWS Fargate vCPU service quota. | `2` | +| `WORKFLOW_WORKER_COUNT` | Local-dev only: number of workers the LAN pool (`worker_lan`) spawns. The API no longer leases a fixed count — its pool admits every discovered worker. In the ECS profile the concurrent worker fleet is bounded by `ECS_MAX_WORKERS`, with the AWS Fargate vCPU service quota as the hard ceiling. | `2` | +| `ECS_MAX_WORKERS` | ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks. The provisioner counts the fleet before each `RunTask` and queues rather than spawning past this, so it bounds workers without limiting the queue (that is `CFDB_WORKFLOW_MAX_ACTIVE`). `0` disables the cap. | `16` | | `WORKFLOW_POOL_NAMESPACE` | wool LAN discovery namespace shared by the API and the worker-pool process. Both processes MUST set the same value or dispatch will hang on `NoWorkersAvailable`. | `cfdb-workers` | | `CFDB_WORKER_TLS_CA` | Path to the shared CA certificate for the wool worker gRPC channel. When this and the cert/key below are all set, the API↔worker dispatch channel uses mutual TLS (`mutual=True`); when all three are unset the channel stays plaintext. Partial config fails fast at startup. The API and every worker MUST use certs signed by the same CA. See [Worker mTLS](#worker-mtls). | - | | `CFDB_WORKER_TLS_CERT` | Path to this process's PEM certificate on the worker gRPC channel — the worker leaf cert on a worker (`worker_main`/`worker_lan`), the API client cert on the API. Must be signed by `CFDB_WORKER_TLS_CA`. | - | @@ -665,13 +666,14 @@ Cache keys are content-addressed using each file's upstream `md5`, so a byte cha Required environment variables: - `SYNC_DATA_DIR` — directory under which the workflow cache and per-job workdirs live. Both subdirectories (`$SYNC_DATA_DIR/cache` and `$SYNC_DATA_DIR/jobs`) must share a filesystem because `LocalFsCache.put` relies on `os.replace` atomicity; the API asserts this at startup and fails fast if they live on different volumes. When unset, the workflow subsystem is disabled, `/data` falls through to direct upstream streaming, `/index` returns 404 for passthrough formats (CSV/TSV/bigWig — there is no index in any state of the world), and `/index` returns 503 for processable formats that would otherwise dispatch a workflow (sidecar-served files still work). -- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), so the **maximum concurrent worker count is bounded by the AWS Fargate vCPU service quota** for the account/region — raise it in Service Quotas; at 1 vCPU per worker it maps roughly 1:1 to concurrent workers. A burst of N distinct uncached files can launch up to ~N workers concurrently, subject to that quota. +- `WORKFLOW_WORKER_COUNT` — local-dev only: how many workers the LAN pool (`python -m cfdb.workflows.worker_lan`) spawns and publishes (default `2`). The API itself no longer leases a fixed count — its `WorkerPool` admits every worker discovery surfaces. In the ECS profile one ephemeral worker is launched per workflow (via `EcsProvisioner` `RunTask`), and the concurrent worker fleet is bounded by `ECS_MAX_WORKERS` (default `16`; see the bounded-concurrency knobs below) with the **AWS Fargate vCPU service quota** as the hard ceiling above it. A burst of N distinct uncached files launches up to `min(N, ECS_MAX_WORKERS)` workers; the rest queue. - `WORKFLOW_POOL_NAMESPACE` — wool discovery namespace shared by the API and the external worker pool (default `cfdb-workers`). Both processes must agree on this value or dispatch will hang waiting for workers. Bounded-concurrency control (issue #45): - `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process. -- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**. +- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs combined — i.e. the **queue depth plus running**, not the worker count). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). To cap the worker fleet without limiting the queue, use `ECS_MAX_WORKERS` instead. Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**. +- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts running/starting worker tasks (`list_tasks`) and skips the spawn when at the cap, so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up — it does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Soft cap — `list_tasks` eventual-consistency lag plus a count-then-spawn race can briefly overshoot. Set on the **API**. - `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**. - `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**. diff --git a/cloudformation/backend.yml b/cloudformation/backend.yml index d4d479f..06c1e70 100644 --- a/cloudformation/backend.yml +++ b/cloudformation/backend.yml @@ -35,6 +35,17 @@ Parameters: Type: Number Default: 1 Description: Number of ECS tasks + EcsMaxWorkers: + Type: Number + Default: 16 + Description: > + Cap on concurrently-running ephemeral Fargate worker tasks, set as + ECS_MAX_WORKERS. Before each RunTask the provisioner counts running + worker tasks and skips the spawn when at this cap, so the worker fleet + is bounded while excess jobs stay queued (the durable scheduler + dispatches them as workers free up). This bounds WORKERS, not the + queue — the queue/admission bound is CFDB_WORKFLOW_MAX_ACTIVE. 0 + disables the cap (rely on the Fargate vCPU quota). HostedZoneName: Type: String Default: vis-api.link @@ -237,6 +248,11 @@ Resources: Value: !Ref WorkflowS3Prefix - Name: AWS_REGION Value: !Ref AWS::Region + # Cap on concurrently-running ephemeral worker tasks: the + # provisioner counts the fleet before each RunTask and queues + # rather than spawning past this. Bounds workers, not the queue. + - Name: ECS_MAX_WORKERS + Value: !Ref EcsMaxWorkers # The cert PEMs (when mTLS is enabled) are injected as env vars # and written to files by the image entrypoint # (cfdb-tls-entrypoint.sh), which points CFDB_WORKER_TLS_CA/CERT/KEY diff --git a/src/cfdb/api/__init__.py b/src/cfdb/api/__init__.py index b315b3c..a392fee 100644 --- a/src/cfdb/api/__init__.py +++ b/src/cfdb/api/__init__.py @@ -223,6 +223,17 @@ def _parse_assign_public_ip(name: str, default: str) -> str: "ECS_WORKER_ASSIGN_PUBLIC_IP", "DISABLED" ) +#: Cap on concurrently-running ephemeral worker tasks on the ECS profile. +#: Before each RunTask the provisioner counts running/starting worker tasks +#: and skips the spawn when already at this cap, so the worker fleet is +#: bounded while excess jobs stay queued (the durable scheduler dispatches +#: them as workers free up — no shedding; the queue is bounded separately by +#: CFDB_WORKFLOW_MAX_ACTIVE). ``0`` disables the cap (rely on the Fargate +#: vCPU quota). Default 16 so an unconfigured ECS deployment fails safe to a +#: small fleet rather than the account quota. Applies only to the ECS +#: profile (no provisioner exists in the local/LAN profile). +ECS_MAX_WORKERS: Final = _parse_int_env("ECS_MAX_WORKERS", 16, minimum=0) + db: AsyncIOMotorDatabase | None = None cache: "CacheBackend | None" = None executor: "JobExecutor | None" = None diff --git a/src/cfdb/api/main.py b/src/cfdb/api/main.py index 53db36d..7b6831b 100644 --- a/src/cfdb/api/main.py +++ b/src/cfdb/api/main.py @@ -180,9 +180,11 @@ def _build_provisioner(profile: WorkflowProfile) -> Optional[EcsProvisioner]: return EcsProvisioner( cluster=profile.ecs.cluster, task_definition=profile.ecs.task_definition, + task_family=profile.ecs.task_family, subnets=profile.ecs.subnets, security_groups=profile.ecs.security_groups, assign_public_ip=profile.ecs.assign_public_ip, + max_workers=profile.ecs.max_workers, endpoint_url=profile.aws_endpoint_url, region_name=profile.aws_region, ) diff --git a/src/cfdb/api/profile.py b/src/cfdb/api/profile.py index 91e2846..9d9aed4 100644 --- a/src/cfdb/api/profile.py +++ b/src/cfdb/api/profile.py @@ -51,6 +51,7 @@ class _EcsConfig: subnets: tuple[str, ...] security_groups: tuple[str, ...] assign_public_ip: AssignPublicIp + max_workers: int @dataclass(frozen=True) @@ -152,4 +153,5 @@ def _ecs_config_from_env() -> Optional[_EcsConfig]: subnets=tuple(api.ECS_WORKER_SUBNETS), security_groups=tuple(api.ECS_WORKER_SECURITY_GROUPS), assign_public_ip=api.ECS_WORKER_ASSIGN_PUBLIC_IP, + max_workers=api.ECS_MAX_WORKERS, ) diff --git a/src/cfdb/workflows/provisioner.py b/src/cfdb/workflows/provisioner.py index 0a65b57..79b67e6 100644 --- a/src/cfdb/workflows/provisioner.py +++ b/src/cfdb/workflows/provisioner.py @@ -93,6 +93,18 @@ class EcsProvisioner: max_in_flight: Soft cap on concurrent ``RunTask`` calls. ECS's ``RunTask`` API is rate-limited to ~20 req/s per account; this guard keeps us well under it. + task_family: Task-definition family used to count the current + worker fleet via ``list_tasks`` for the ``max_workers`` cap. + Defaults to ``task_definition`` with any ``:revision`` suffix + stripped. + max_workers: Cap on concurrently-running worker tasks. Before each + ``RunTask`` the provisioner counts running/starting worker tasks + and skips the spawn when already at this cap, so the worker + fleet is bounded while excess jobs stay queued (the durable + scheduler dispatches them as workers free up — no shedding). + ``0`` disables the cap (rely on the Fargate vCPU quota). Soft: + ``list_tasks`` eventual-consistency lag plus a count-then-spawn + race across distinct workflow keys can briefly overshoot. """ def __init__( @@ -107,11 +119,15 @@ def __init__( endpoint_url: Optional[str] = None, region_name: Optional[str] = None, max_in_flight: int = 16, + task_family: Optional[str] = None, + max_workers: int = 0, ) -> None: if not cluster: raise ValueError("EcsProvisioner requires a cluster name") if not task_definition: raise ValueError("EcsProvisioner requires a task_definition") + if max_workers < 0: + raise ValueError(f"max_workers must be >= 0; got {max_workers}") subnet_list = list(subnets) if not subnet_list: raise ValueError("EcsProvisioner requires at least one subnet") @@ -128,6 +144,10 @@ def __init__( self._cluster = cluster self._task_definition = task_definition + # Family used to count the running fleet via ``list_tasks``; strip + # any ``:revision`` so a pinned task-def revision still matches. + self._task_family = task_family or task_definition.split(":", 1)[0] + self._max_workers = max_workers self._subnets = subnet_list self._security_groups = list(security_groups) self._assign_public_ip = assign_public_ip @@ -239,11 +259,58 @@ async def _release_dedup_slot() -> None: self._in_flight.pop(dedup_key, None) try: + # Worker-fleet cap: if the fleet is already at the cap, do not + # launch another task. Return an empty ARN list rather than + # raise — the caller (``_handle_overflow``) reschedules the job, + # so it stays queued and runs when an existing worker frees up. + # This bounds the worker-container count while preserving the + # queue (the admission ceiling, not this cap, bounds the queue). + if self._max_workers > 0: + running = await self._current_worker_count() + if running >= self._max_workers: + logger.info( + "Worker fleet at capacity (%d/%d); not spawning for " + "%s — job stays queued until a worker frees", + running, + self._max_workers, + dedup_key, + ) + return [] async with self._semaphore: return await self._run_task() finally: await asyncio.shield(_release_dedup_slot()) + async def _current_worker_count(self) -> int: + """Count worker tasks whose desired status is RUNNING. + + ``list_tasks`` with ``desiredStatus="RUNNING"`` returns tasks still + starting (PROVISIONING / PENDING / ACTIVATING) as well as those + already running, so the count reflects the workers that exist or are + coming up — the right denominator for the fleet cap. A ``list_tasks`` + failure raises :class:`RetryableProvisionerError` so the caller + queues the job and retries on the next tick rather than spawning + blind past the cap. + + Read-only and cheap, so it runs on the default executor rather than + the owned RunTask pool — ``aclose``'s deterministic drain only needs + to cover the billable RunTask launches. The cap is small (well under + the first ``list_tasks`` page of 100), so the first page is the whole + fleet and pagination is unnecessary to decide whether it is reached. + """ + try: + response = await asyncio.to_thread( + self._client.list_tasks, + cluster=self._cluster, + family=self._task_family, + desiredStatus="RUNNING", + ) + except (ClientError, BotoCoreError) as exc: + raise RetryableProvisionerError( + f"list_tasks failed: {type(exc).__name__}: {exc}" + ) from exc + return len(response.get("taskArns") or []) + async def _run_task(self) -> list[str]: """Single ``RunTask`` invocation translated to a list of task ARNs. diff --git a/tests/test_api/test_lifespan_workerpool.py b/tests/test_api/test_lifespan_workerpool.py index b4681fd..86c40f1 100644 --- a/tests/test_api/test_lifespan_workerpool.py +++ b/tests/test_api/test_lifespan_workerpool.py @@ -166,6 +166,7 @@ async def test_build_discovery_should_yield_ecs_discovery_un_entered( subnets=("subnet-1",), security_groups=(), assign_public_ip="DISABLED", + max_workers=16, ) profile = profile_mod.WorkflowProfile( kind="ecs", diff --git a/tests/test_workflows/test_provisioner.py b/tests/test_workflows/test_provisioner.py index 6b85272..4cbf746 100644 --- a/tests/test_workflows/test_provisioner.py +++ b/tests/test_workflows/test_provisioner.py @@ -12,13 +12,23 @@ class _FakeEcsClient: """In-memory ECS client recording RunTask calls for assertions.""" - def __init__(self, *, response: dict | None = None, raise_on_call: Exception | None = None) -> None: + def __init__( + self, + *, + response: dict | None = None, + raise_on_call: Exception | None = None, + list_tasks_arns: list[str] | None = None, + list_tasks_raise: Exception | None = None, + ) -> None: self.calls: list[dict] = [] + self.list_tasks_calls: list[dict] = [] self._response = response or { "tasks": [{"taskArn": "arn:aws:ecs:::task/cluster/abc"}], "failures": [], } self._raise = raise_on_call + self._list_tasks_arns = list(list_tasks_arns or []) + self._list_tasks_raise = list_tasks_raise def run_task(self, **kwargs): self.calls.append(kwargs) @@ -26,6 +36,12 @@ def run_task(self, **kwargs): raise self._raise return self._response + def list_tasks(self, **kwargs): + self.list_tasks_calls.append(kwargs) + if self._list_tasks_raise is not None: + raise self._list_tasks_raise + return {"taskArns": list(self._list_tasks_arns)} + class _SimpleGatedClient(_FakeEcsClient): """run_task blocks on a threading.Event until released by the test.""" @@ -398,6 +414,152 @@ async def _resolved(value): return value +class TestEcsProvisionerWorkerCap: + def test___init___rejects_negative_max_workers(self): + """Test that a negative max_workers is rejected at construction. + + Given: + A negative ``max_workers`` value. + When: + EcsProvisioner is constructed. + Then: + It should raise ValueError so the misconfiguration is caught at + boot rather than silently disabling the cap. + """ + # Act & assert + with pytest.raises(ValueError, match="max_workers"): + EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=_FakeEcsClient(), + max_workers=-1, + ) + + @pytest.mark.asyncio + async def test_request_should_skip_spawn_when_fleet_at_capacity(self): + """Test that request does not launch a task when the fleet is at the cap. + + Given: + A provisioner with ``max_workers=2`` whose ``list_tasks`` reports + two running worker tasks. + When: + request is awaited. + Then: + It should count the fleet, skip RunTask entirely, and return an + empty ARN list so the caller queues the job instead of spawning a + third worker. + """ + # Arrange + client = _FakeEcsClient( + list_tasks_arns=["arn:task/w1", "arn:task/w2"], + ) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker:7", + subnets=["subnet-1"], + client=client, + max_workers=2, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == [] + assert client.calls == [] # RunTask never invoked + assert len(client.list_tasks_calls) == 1 + # The :revision suffix is stripped for the family filter. + assert client.list_tasks_calls[0]["family"] == "worker" + assert client.list_tasks_calls[0]["desiredStatus"] == "RUNNING" + + @pytest.mark.asyncio + async def test_request_should_spawn_when_below_capacity(self): + """Test that request launches a task when the fleet is below the cap. + + Given: + A provisioner with ``max_workers=3`` whose ``list_tasks`` reports + one running worker task. + When: + request is awaited. + Then: + It should launch one worker via RunTask and return its ARN. + """ + # Arrange + client = _FakeEcsClient(list_tasks_arns=["arn:task/w1"]) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + max_workers=3, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == ["arn:aws:ecs:::task/cluster/abc"] + assert len(client.calls) == 1 + assert len(client.list_tasks_calls) == 1 + + @pytest.mark.asyncio + async def test_request_should_not_count_fleet_when_cap_disabled(self): + """Test that max_workers=0 disables the fleet count entirely. + + Given: + A provisioner with the default ``max_workers=0`` (cap disabled). + When: + request is awaited. + Then: + It should launch a worker without ever calling ``list_tasks``, so + an unset cap incurs no extra ECS round-trip and never throttles. + """ + # Arrange + client = _FakeEcsClient(list_tasks_arns=["arn:task/w1", "arn:task/w2"]) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + ) + + # Act + arns = await provisioner.request(dedup_key="wf-1") + + # Assert + assert arns == ["arn:aws:ecs:::task/cluster/abc"] + assert client.list_tasks_calls == [] + + @pytest.mark.asyncio + async def test_request_should_raise_retryable_when_list_tasks_fails(self): + """Test that a list_tasks failure surfaces as a retryable error. + + Given: + A provisioner with a cap whose ``list_tasks`` raises a + botocore ClientError. + When: + request is awaited. + Then: + It should raise RetryableProvisionerError (so the caller queues + and retries) and never launch a task blind past the cap. + """ + # Arrange + client = _FakeEcsClient(list_tasks_raise=_client_error("ThrottlingException")) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + max_workers=2, + ) + + # Act & assert + with pytest.raises(RetryableProvisionerError, match="list_tasks failed"): + await provisioner.request(dedup_key="wf-1") + assert client.calls == [] + + # --------------------------------------------------------------------------- # Moto-backed wire-shape verification. #