From e3c8df7ebde3a8cbdc7e9ac751151bdf40573132 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 23 Jun 2026 15:16:18 -0400 Subject: [PATCH] fix: Bound the ECS worker cap against concurrent cold-start bursts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The worker-fleet cap counted only list_tasks before each RunTask. ECS list_tasks is eventually consistent, so a simultaneous burst of distinct uncached files — the exact scenario the cap guards — had every overflow spawn-decision observe a stale ~0 count and launch a worker. Verified live: a 24-file burst drove 24 Fargate workers (peak 24, cap 16). Count this provisioner's own recently-issued launches (aged out after a visibility window) on top of the list_tasks fleet, under a lock so concurrent burst deciders observe each other's reservations. A failed RunTask releases its reservation so a spawn that never happened is not over-counted. A single API task is now held to the cap; a multi-task API would each track only its own launches and need a shared lease (documented). Add a regression test that drives a concurrent burst against a permanently list_tasks-blind client and asserts at most max_workers RunTask calls happen — it fails on the previous list_tasks-only cap. --- README.md | 2 +- src/cfdb/workflows/provisioner.py | 109 ++++++++++++++++++----- tests/test_workflows/test_provisioner.py | 39 ++++++++ 3 files changed, 129 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 0b31e05..81fce1c 100644 --- a/README.md +++ b/README.md @@ -673,7 +673,7 @@ Bounded-concurrency control (issue #45): - `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process. - `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs combined — i.e. the **queue depth plus running**, not the worker count). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). To cap the worker fleet without limiting the queue, use `ECS_MAX_WORKERS` instead. Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**. -- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts running/starting worker tasks (`list_tasks`) and skips the spawn when at the cap, so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up — it does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Soft cap — `list_tasks` eventual-consistency lag plus a count-then-spawn race can briefly overshoot. Set on the **API**. +- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts the ECS-visible fleet (`list_tasks`) **plus its own recently-issued launches that `list_tasks` may not reflect yet**, under a lock, and skips the spawn when at the cap — so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up. It does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Counting in-flight launches is what bounds a simultaneous cold-start burst; counting only `list_tasks` (which is eventually consistent) lets a concurrent burst see a stale count and every spawn slip under the cap. Bounded per API task: a single API task (the default) is held to the cap; a multi-task API would each track only its own launches and need a shared lease. Set on the **API**. - `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**. - `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**. diff --git a/src/cfdb/workflows/provisioner.py b/src/cfdb/workflows/provisioner.py index 79b67e6..f5e1300 100644 --- a/src/cfdb/workflows/provisioner.py +++ b/src/cfdb/workflows/provisioner.py @@ -19,6 +19,7 @@ import concurrent.futures import logging import threading +import time from collections.abc import Iterable from typing import Any, Literal, Optional, get_args @@ -35,6 +36,17 @@ _ASSIGN_PUBLIC_IP_VALUES = frozenset(get_args(AssignPublicIp)) +#: How long a just-issued ``RunTask`` is counted as an in-flight worker +#: launch on top of what ``list_tasks`` reports. ECS ``list_tasks`` is +#: eventually consistent — a freshly launched task is not visible for a few +#: seconds — so a simultaneous burst would otherwise see a stale count and +#: every spawn would slip under the cap. Counting recent launches for this +#: window closes that race; it MUST exceed the real ``list_tasks`` visibility +#: lag (err high — over-counting is conservative, only briefly delaying a +#: spawn, whereas under-counting lets the fleet overshoot the cap). +_LAUNCH_VISIBILITY_WINDOW_S = 60.0 + + class _SubmittedRunTask: """Bookkeeping slot for an in-flight ``RunTask`` future. @@ -98,13 +110,18 @@ class EcsProvisioner: Defaults to ``task_definition`` with any ``:revision`` suffix stripped. max_workers: Cap on concurrently-running worker tasks. Before each - ``RunTask`` the provisioner counts running/starting worker tasks - and skips the spawn when already at this cap, so the worker - fleet is bounded while excess jobs stay queued (the durable - scheduler dispatches them as workers free up — no shedding). - ``0`` disables the cap (rely on the Fargate vCPU quota). Soft: - ``list_tasks`` eventual-consistency lag plus a count-then-spawn - race across distinct workflow keys can briefly overshoot. + ``RunTask`` the provisioner counts the ECS-visible fleet plus its + own recently-issued launches (which ``list_tasks`` may not + reflect yet) under a lock, and skips the spawn when already at + this cap, so the worker fleet is bounded while excess jobs stay + queued (the durable scheduler dispatches them as workers free up + — no shedding). ``0`` disables the cap (rely on the Fargate vCPU + quota). Counting in-flight launches is what bounds a simultaneous + cold-start burst; counting only ``list_tasks`` lets a concurrent + burst see a stale count and every spawn slip under the cap. + Bounded per API task: a single API task (the default) is held to + the cap; a multi-task API would each track only its own launches + and need a shared lease to bound the total. """ def __init__( @@ -178,6 +195,14 @@ def __init__( ) self._pending_lock = threading.Lock() self._pending: set[_SubmittedRunTask] = set() + # Worker-fleet cap accounting. ``_recent_launches`` holds the + # monotonic timestamps of RunTask launches this provisioner issued + # but that ``list_tasks`` may not reflect yet; the cap counts them on + # top of the ECS-visible fleet so a concurrent burst cannot all slip + # under a stale count. ``_cap_lock`` serializes the count-and-reserve + # so concurrent deciders observe each other's reservations. + self._cap_lock = asyncio.Lock() + self._recent_launches: list[float] = [] async def request(self, *, dedup_key: str) -> list[str]: """Launch a worker task, returning its ARN(s). @@ -259,28 +284,72 @@ async def _release_dedup_slot() -> None: self._in_flight.pop(dedup_key, None) try: - # Worker-fleet cap: if the fleet is already at the cap, do not - # launch another task. Return an empty ARN list rather than + # Worker-fleet cap: reserve a slot under the cap before launching. + # When the fleet is full, return an empty ARN list rather than # raise — the caller (``_handle_overflow``) reschedules the job, # so it stays queued and runs when an existing worker frees up. # This bounds the worker-container count while preserving the # queue (the admission ceiling, not this cap, bounds the queue). + token: Optional[float] = None if self._max_workers > 0: - running = await self._current_worker_count() - if running >= self._max_workers: - logger.info( - "Worker fleet at capacity (%d/%d); not spawning for " - "%s — job stays queued until a worker frees", - running, - self._max_workers, - dedup_key, - ) + token = await self._reserve_worker_slot(dedup_key) + if token is None: return [] - async with self._semaphore: - return await self._run_task() + try: + async with self._semaphore: + return await self._run_task() + except BaseException: + # The launch did not produce a worker; release the reserved + # slot so the in-flight count does not over-count a spawn + # that never happened. + if token is not None: + await self._release_worker_slot(token) + raise finally: await asyncio.shield(_release_dedup_slot()) + async def _reserve_worker_slot(self, dedup_key: str) -> Optional[float]: + """Reserve a worker slot under the cap, or return None when full. + + Counts the ECS-visible fleet PLUS this provisioner's own recent + launches (``_recent_launches``) that ``list_tasks`` may not reflect + yet, all under ``_cap_lock`` so concurrent burst deciders observe + each other's reservations. This is what bounds a simultaneous + cold-start burst: counting only ``list_tasks`` lets every decider see + a stale ~0 and all spawn. Returns the launch token (a monotonic + timestamp) on success, or None when already at the cap. + """ + async with self._cap_lock: + now = time.monotonic() + self._recent_launches = [ + t + for t in self._recent_launches + if now - t < _LAUNCH_VISIBILITY_WINDOW_S + ] + ecs_visible = await self._current_worker_count() + effective = ecs_visible + len(self._recent_launches) + if effective >= self._max_workers: + logger.info( + "Worker fleet at capacity (%d running + %d in-flight " + ">= %d); not spawning for %s — job stays queued until a " + "worker frees", + ecs_visible, + len(self._recent_launches), + self._max_workers, + dedup_key, + ) + return None + self._recent_launches.append(now) + return now + + async def _release_worker_slot(self, token: float) -> None: + """Drop a reservation whose RunTask did not produce a worker.""" + async with self._cap_lock: + try: + self._recent_launches.remove(token) + except ValueError: + pass # already aged out of the visibility window + async def _current_worker_count(self) -> int: """Count worker tasks whose desired status is RUNNING. diff --git a/tests/test_workflows/test_provisioner.py b/tests/test_workflows/test_provisioner.py index 4cbf746..f71ea44 100644 --- a/tests/test_workflows/test_provisioner.py +++ b/tests/test_workflows/test_provisioner.py @@ -531,6 +531,45 @@ async def test_request_should_not_count_fleet_when_cap_disabled(self): assert arns == ["arn:aws:ecs:::task/cluster/abc"] assert client.list_tasks_calls == [] + @pytest.mark.asyncio + async def test_request_should_cap_a_concurrent_cold_fleet_burst(self): + """Test that a simultaneous burst from a list_tasks-blind fleet is capped. + + Given: + ``max_workers=2`` and a client whose ``list_tasks`` always reports + zero running tasks — modeling the ECS eventual-consistency lag + where freshly launched workers are not yet visible (the live + failure mode where every decider saw a stale 0 and all spawned). + When: + Five distinct-key requests are awaited concurrently. + Then: + Only two RunTask calls happen: the in-flight-launch accounting + bounds the burst even though ``list_tasks`` never reflects the new + workers, and the excess three requests return [] (queue, not + spawn). + """ + # Arrange — list_tasks is permanently blind (always 0 visible). + client = _FakeEcsClient(list_tasks_arns=[]) + provisioner = EcsProvisioner( + cluster="c", + task_definition="worker", + subnets=["subnet-1"], + client=client, + max_workers=2, + ) + + # Act + results = await asyncio.gather( + *(provisioner.request(dedup_key=f"wf-{i}") for i in range(5)) + ) + + # Assert — at most the cap launched, despite list_tasks reporting 0. + spawned = [r for r in results if r] + skipped = [r for r in results if r == []] + assert len(client.calls) == 2, f"expected 2 RunTask, got {len(client.calls)}" + assert len(spawned) == 2 + assert len(skipped) == 3 + @pytest.mark.asyncio async def test_request_should_raise_retryable_when_list_tasks_fails(self): """Test that a list_tasks failure surfaces as a retryable error.