Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ Bounded-concurrency control (issue #45):

- `CFDB_WORKER_MAX_CONCURRENT_TASKS` — per-worker backpressure threshold: a worker rejects a dispatch (gRPC `RESOURCE_EXHAUSTED`, which the priority load balancer treats as transient and rotates past) once it already has this many tasks in flight (default `1`, to serialize the subprocess pipelines on a 1-vCPU worker; `0` disables backpressure). Set on the **worker** process.
- `CFDB_WORKFLOW_MAX_ACTIVE` — admission ceiling on concurrently active workflows (`pending` + `running` jobs combined — i.e. the **queue depth plus running**, not the worker count). Once this many are active, `/data` and `/index` shed new preprocessing requests with `429 Retry-After` before claiming the per-file mutex (default `1024`). To cap the worker fleet without limiting the queue, use `ECS_MAX_WORKERS` instead. Soft cap — a count-then-claim race may briefly overshoot. Set on the **API**.
- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts running/starting worker tasks (`list_tasks`) and skips the spawn when at the cap, so the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up — it does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Soft cap — `list_tasks` eventual-consistency lag plus a count-then-spawn race can briefly overshoot. Set on the **API**.
- `ECS_MAX_WORKERS` — ECS profile only: cap on concurrently-running ephemeral Fargate worker tasks (default `16`; `0` disables the cap and relies on the Fargate vCPU quota). Before each `RunTask` the `EcsProvisioner` counts the ECS-visible fleet (`list_tasks`) **plus its own recently-issued launches that `list_tasks` may not reflect yet**, under a lock, and skips the spawn when at the capso the **worker fleet** is bounded while excess jobs stay `pending` and the durable scheduler dispatches them as workers free up. It does **not** shed (that is `CFDB_WORKFLOW_MAX_ACTIVE`'s job) and does **not** limit the queue. Counting in-flight launches is what bounds a simultaneous cold-start burst; counting only `list_tasks` (which is eventually consistent) lets a concurrent burst see a stale count and every spawn slip under the cap. Bounded per API task: a single API task (the default) is held to the cap; a multi-task API would each track only its own launches and need a shared lease. Set on the **API**.
- `CFDB_WORKFLOW_RETRY_INTERVAL_S` — base cadence (plus a small random jitter) at which the durable retry scheduler re-attempts dispatch for a job awaiting worker capacity (default `120`, i.e. 2 min). A dispatch attempt that finds no free worker leaves the job `pending` and rescheduled rather than blocking the request. Set on the **API**.
- `CFDB_WORKFLOW_DISPATCH_DEADLINE_S` — how long a job may wait for worker capacity (re-attempted on the retry cadence above) before the scheduler fails it with a `capacity:`-prefixed error (default `14400`, i.e. 4 h). Replaces the former single in-request `CFDB_WORKFLOW_DISPATCH_WAIT_S` wait — instead of blocking one request on a cold start, the job queues durably and is retried to this deadline. Set on the **API**.

Expand Down
109 changes: 89 additions & 20 deletions src/cfdb/workflows/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import concurrent.futures
import logging
import threading
import time
from collections.abc import Iterable
from typing import Any, Literal, Optional, get_args

Expand All @@ -35,6 +36,17 @@
_ASSIGN_PUBLIC_IP_VALUES = frozenset(get_args(AssignPublicIp))


#: How long a just-issued ``RunTask`` is counted as an in-flight worker
#: launch on top of what ``list_tasks`` reports. ECS ``list_tasks`` is
#: eventually consistent — a freshly launched task is not visible for a few
#: seconds — so a simultaneous burst would otherwise see a stale count and
#: every spawn would slip under the cap. Counting recent launches for this
#: window closes that race; it MUST exceed the real ``list_tasks`` visibility
#: lag (err high — over-counting is conservative, only briefly delaying a
#: spawn, whereas under-counting lets the fleet overshoot the cap).
_LAUNCH_VISIBILITY_WINDOW_S = 60.0


class _SubmittedRunTask:
"""Bookkeeping slot for an in-flight ``RunTask`` future.

Expand Down Expand Up @@ -98,13 +110,18 @@ class EcsProvisioner:
Defaults to ``task_definition`` with any ``:revision`` suffix
stripped.
max_workers: Cap on concurrently-running worker tasks. Before each
``RunTask`` the provisioner counts running/starting worker tasks
and skips the spawn when already at this cap, so the worker
fleet is bounded while excess jobs stay queued (the durable
scheduler dispatches them as workers free up — no shedding).
``0`` disables the cap (rely on the Fargate vCPU quota). Soft:
``list_tasks`` eventual-consistency lag plus a count-then-spawn
race across distinct workflow keys can briefly overshoot.
``RunTask`` the provisioner counts the ECS-visible fleet plus its
own recently-issued launches (which ``list_tasks`` may not
reflect yet) under a lock, and skips the spawn when already at
this cap, so the worker fleet is bounded while excess jobs stay
queued (the durable scheduler dispatches them as workers free up
— no shedding). ``0`` disables the cap (rely on the Fargate vCPU
quota). Counting in-flight launches is what bounds a simultaneous
cold-start burst; counting only ``list_tasks`` lets a concurrent
burst see a stale count and every spawn slip under the cap.
Bounded per API task: a single API task (the default) is held to
the cap; a multi-task API would each track only its own launches
and need a shared lease to bound the total.
"""

def __init__(
Expand Down Expand Up @@ -178,6 +195,14 @@ def __init__(
)
self._pending_lock = threading.Lock()
self._pending: set[_SubmittedRunTask] = set()
# Worker-fleet cap accounting. ``_recent_launches`` holds the
# monotonic timestamps of RunTask launches this provisioner issued
# but that ``list_tasks`` may not reflect yet; the cap counts them on
# top of the ECS-visible fleet so a concurrent burst cannot all slip
# under a stale count. ``_cap_lock`` serializes the count-and-reserve
# so concurrent deciders observe each other's reservations.
self._cap_lock = asyncio.Lock()
self._recent_launches: list[float] = []

async def request(self, *, dedup_key: str) -> list[str]:
"""Launch a worker task, returning its ARN(s).
Expand Down Expand Up @@ -259,28 +284,72 @@ async def _release_dedup_slot() -> None:
self._in_flight.pop(dedup_key, None)

try:
# Worker-fleet cap: if the fleet is already at the cap, do not
# launch another task. Return an empty ARN list rather than
# Worker-fleet cap: reserve a slot under the cap before launching.
# When the fleet is full, return an empty ARN list rather than
# raise — the caller (``_handle_overflow``) reschedules the job,
# so it stays queued and runs when an existing worker frees up.
# This bounds the worker-container count while preserving the
# queue (the admission ceiling, not this cap, bounds the queue).
token: Optional[float] = None
if self._max_workers > 0:
running = await self._current_worker_count()
if running >= self._max_workers:
logger.info(
"Worker fleet at capacity (%d/%d); not spawning for "
"%s — job stays queued until a worker frees",
running,
self._max_workers,
dedup_key,
)
token = await self._reserve_worker_slot(dedup_key)
if token is None:
return []
async with self._semaphore:
return await self._run_task()
try:
async with self._semaphore:
return await self._run_task()
except BaseException:
# The launch did not produce a worker; release the reserved
# slot so the in-flight count does not over-count a spawn
# that never happened.
if token is not None:
await self._release_worker_slot(token)
raise
finally:
await asyncio.shield(_release_dedup_slot())

async def _reserve_worker_slot(self, dedup_key: str) -> Optional[float]:
"""Reserve a worker slot under the cap, or return None when full.

Counts the ECS-visible fleet PLUS this provisioner's own recent
launches (``_recent_launches``) that ``list_tasks`` may not reflect
yet, all under ``_cap_lock`` so concurrent burst deciders observe
each other's reservations. This is what bounds a simultaneous
cold-start burst: counting only ``list_tasks`` lets every decider see
a stale ~0 and all spawn. Returns the launch token (a monotonic
timestamp) on success, or None when already at the cap.
"""
async with self._cap_lock:
now = time.monotonic()
self._recent_launches = [
t
for t in self._recent_launches
if now - t < _LAUNCH_VISIBILITY_WINDOW_S
]
ecs_visible = await self._current_worker_count()
effective = ecs_visible + len(self._recent_launches)
if effective >= self._max_workers:
logger.info(
"Worker fleet at capacity (%d running + %d in-flight "
">= %d); not spawning for %s — job stays queued until a "
"worker frees",
ecs_visible,
len(self._recent_launches),
self._max_workers,
dedup_key,
)
return None
self._recent_launches.append(now)
return now

async def _release_worker_slot(self, token: float) -> None:
"""Drop a reservation whose RunTask did not produce a worker."""
async with self._cap_lock:
try:
self._recent_launches.remove(token)
except ValueError:
pass # already aged out of the visibility window

async def _current_worker_count(self) -> int:
"""Count worker tasks whose desired status is RUNNING.

Expand Down
39 changes: 39 additions & 0 deletions tests/test_workflows/test_provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,45 @@ async def test_request_should_not_count_fleet_when_cap_disabled(self):
assert arns == ["arn:aws:ecs:::task/cluster/abc"]
assert client.list_tasks_calls == []

@pytest.mark.asyncio
async def test_request_should_cap_a_concurrent_cold_fleet_burst(self):
"""Test that a simultaneous burst from a list_tasks-blind fleet is capped.

Given:
``max_workers=2`` and a client whose ``list_tasks`` always reports
zero running tasks — modeling the ECS eventual-consistency lag
where freshly launched workers are not yet visible (the live
failure mode where every decider saw a stale 0 and all spawned).
When:
Five distinct-key requests are awaited concurrently.
Then:
Only two RunTask calls happen: the in-flight-launch accounting
bounds the burst even though ``list_tasks`` never reflects the new
workers, and the excess three requests return [] (queue, not
spawn).
"""
# Arrange — list_tasks is permanently blind (always 0 visible).
client = _FakeEcsClient(list_tasks_arns=[])
provisioner = EcsProvisioner(
cluster="c",
task_definition="worker",
subnets=["subnet-1"],
client=client,
max_workers=2,
)

# Act
results = await asyncio.gather(
*(provisioner.request(dedup_key=f"wf-{i}") for i in range(5))
)

# Assert — at most the cap launched, despite list_tasks reporting 0.
spawned = [r for r in results if r]
skipped = [r for r in results if r == []]
assert len(client.calls) == 2, f"expected 2 RunTask, got {len(client.calls)}"
assert len(spawned) == 2
assert len(skipped) == 3

@pytest.mark.asyncio
async def test_request_should_raise_retryable_when_list_tasks_fails(self):
"""Test that a list_tasks failure surfaces as a retryable error.
Expand Down
Loading