Skip to content

Add bounded concurrency control for the preprocessing/indexing workflow pipeline #45

Description

@conradbzura

Description

Bound how much preprocessing/indexing work runs at once and shed excess at admission. The delivered design (PR #63) is four cooperating layers:

  • Per-worker backpressure hook — A wool.BackpressureLike hook (TaskCountBackpressure) wired into worker_main and worker_lan, configurable via CFDB_WORKER_MAX_CONCURRENT_TASKS (default 1; 0 disables), rejects a dispatch once the worker already has that many tasks in flight (BackpressureContext.active_task_count >= threshold). wool aborts the rejected dispatch with RESOURCE_EXHAUSTED, which the load balancer treats as transient and rotates past. This serializes the subprocess pipelines (samtools/bgzip/tabix/sort/…) so a single 1-vCPU/2-GiB worker stops oversubscribing.
  • Priority (leaky-bucket) load balancer — A PriorityLoadBalancer wired into the API pool offers each task to discovered workers in a stable order (sorted by worker uid) on every dispatch. Combined with per-worker backpressure, load concentrates on the lowest-ordered workers and higher-ordered workers drain to idle and self-reap (CFDB_WORKER_MAX_LIFETIME_SECONDS), instead of every worker carrying a thin perpetual slice. It honors wool's worker-health contract (rotate past TransientRpcError, evict on RpcError, raise NoWorkersAvailable when none accept).
  • Durable retry-to-deadline scheduler with spawn-on-overflow — Dispatch is driven by a single durable, Mongo-backed scheduler rather than an in-request wait. A dispatch attempt offers the task to existing workers once via the priority LB; the routine emits an immediate "accepted" heartbeat so the job is marked running the instant a worker takes it (before the upstream download), which both bounds observed concurrency to the worker count and keeps the job out of the leasable set during its download. On overflow (every worker rejected, or none exist) the job stays pending with next_dispatch_at set and, on the ECS profile, a single bounded worker spawn is requested (inverting the old unconditional pre-dispatch spawn). The scheduler re-attempts due jobs every CFDB_WORKFLOW_RETRY_INTERVAL_S (plus jitter) until a worker frees up or CFDB_WORKFLOW_DISPATCH_DEADLINE_S elapses (then the job is failed with a capacity: prefix). Because the queue lives in Mongo it survives an API restart, and a boot-and-periodic orphan-recovery sweep re-queues jobs a crash left mid-flight — a running job whose consumer died, or a fresh pending claim that never rescheduled — once they pass CFDB_WORKFLOW_STALE_THRESHOLD_S, so recovery is autonomous and works for a surviving replica without a restart or a client re-request. This replaces the former single in-request CFDB_WORKFLOW_DISPATCH_WAIT_S cold-start wait, which is removed.
  • Central, Mongo-backed admission cap — In ensure_workflow, count active jobs (pending + running) and reject new requests with 429 Retry-After once CFDB_WORKFLOW_MAX_ACTIVE is reached, before claiming the workflow mutex. The cap lives in Mongo (not in-process) so it stays correct across API replicas. This is a soft cap (count-then-claim TOCTOU can briefly overshoot); the per-file workflow_key mutex is unchanged and keeps deduping same-file requests. The side-effect-free /status readiness probes never dispatch and so never 429.

Motivation

The /data and /index endpoints are reachable on the internet-facing ALB and unauthenticated, and the in-process pool lease cap was removed, so concurrency is currently bounded only by the Fargate vCPU service quota — and only via RunTask capacity errors. A single caller issuing many distinct /data requests can fan out a large number of Fargate worker tasks (real cost), and because workers run with backpressure=None and max_concurrent_streams=100, each worker accepts many routines at once whose subprocess pipelines run in parallel, oversubscribing a 1-vCPU/2-GiB task. There is no central limit on concurrent workflows and no way to shed load at admission. This adds defense against accidental or abusive job floods and makes concurrency — and cost — predictable.

Expected Outcome

  • A configurable per-worker concurrency limit via a BackpressureLike hook, wired into worker_main, worker_lan, and the API pool; behavior unchanged when disabled (0).
  • A priority/leaky-bucket load balancer on the API pool that concentrates load so over-provisioned workers drain to idle and self-reap.
  • A configurable central cap on concurrent active workflows enforced in ensure_workflow against the Mongo jobs collection, returning 429 Retry-After past CFDB_WORKFLOW_MAX_ACTIVE.
  • A durable, Mongo-backed retry scheduler that keeps an over-capacity job pending and re-attempts dispatch every CFDB_WORKFLOW_RETRY_INTERVAL_S (plus jitter) until it runs or CFDB_WORKFLOW_DISPATCH_DEADLINE_S elapses; the queue survives an API restart. On the ECS profile, overflow requests one bounded worker spawn.
  • Autonomous recovery of jobs orphaned by an API or worker crash — a scheduler sweep (on boot and every tick) re-queues a running job whose consumer died, or a fresh pending claim that never rescheduled, once it passes the stale threshold, so a restarted or surviving replica revives it without a client re-request.
  • A job is marked running on worker acceptance (a leading routine heartbeat), bounding observed concurrency to the worker count and preventing re-lease/double-dispatch during the download.
  • Env-configurable knobs: CFDB_WORKER_MAX_CONCURRENT_TASKS, CFDB_WORKFLOW_MAX_ACTIVE, CFDB_WORKFLOW_RETRY_INTERVAL_S, CFDB_WORKFLOW_DISPATCH_DEADLINE_S.
  • Tests covering the backpressure decision, the priority LB rotation/eviction, admission rejection at the cap, the overflow→reschedule path, the retry-to-deadline failure, mark-running-on-acceptance, and autonomous orphan recovery.

Out of scope (complementary, tracked separately): edge rate-limiting per IP/identity (WAF / API Gateway) and locking the API down to the web app.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions