Skip to content

Add bounded concurrency control for the preprocessing/indexing workflow pipeline — Closes #45#63

Merged
conradbzura merged 19 commits into
masterfrom
45-bounded-concurrency-dispatch
Jun 23, 2026
Merged

Add bounded concurrency control for the preprocessing/indexing workflow pipeline — Closes #45#63
conradbzura merged 19 commits into
masterfrom
45-bounded-concurrency-dispatch

Conversation

@conradbzura

@conradbzura conradbzura commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

Summary

Make preprocessing/indexing concurrency and cost predictable, and shed excess at admission, with four cooperating layers: per-worker backpressure, a priority (leaky-bucket) load balancer, a durable Mongo-backed retry-to-deadline scheduler with spawn-on-overflow, and a central admission ceiling that returns 429. Dispatch shifts from "block one request waiting for a worker (and spawn one unconditionally)" to "queue durably, retry to a deadline, and spawn only on overflow" — fixing the per-request worker-spawn cost leak and bounding both concurrency and the backlog.

A job is marked running the instant a worker accepts it (a leading routine heartbeat), which bounds observed concurrency to the worker count and prevents the scheduler from re-leasing a job during its upstream download. Recovery is autonomous: a scheduler sweep (on boot and every tick) re-queues jobs a crash left mid-flight — a running job whose API consumer died, or a fresh pending claim that never rescheduled — so a restarted or surviving replica revives them without a client re-requesting the file.

Verified end-to-end on a local 3-worker stack (1 task/worker, admission ceiling 15): a 16th distinct request returns 429 Retry-After; 15 queued jobs sit pending and are retried while no worker exists, then drain to completed with max concurrent RUNNING = 3 and zero failures; and seeded in-flight orphans are recovered to completed on API boot with no requests issued.

Closes #45

Proposed changes

Per-worker backpressure

New workflows/backpressure.py (TaskCountBackpressure, a picklable wool.BackpressureLike) wired into worker_main and worker_lan, gated by CFDB_WORKER_MAX_CONCURRENT_TASKS (default 1; 0 disables). A worker at capacity rejects with RESOURCE_EXHAUSTED.

Priority (leaky-bucket) load balancer

New workflows/loadbalancer.py (PriorityLoadBalancer) wired into the API WorkerPool. Offers each task to workers in a stable uid order so load concentrates and over-provisioned workers drain to idle and self-reap; honors wool's rotate-on-transient / evict-on-RpcError / NoWorkersAvailable contract.

Durable retry-to-deadline scheduler, spawn-on-overflow, mark-running-on-acceptance

executor.py restructured: ensure_workflow dispatches a fresh claim inline; a single durable scheduler re-attempts queued jobs. Each attempt opens the stream once via the LB and marks the job running on the worker-acceptance heartbeat (before the download). On overflow the job stays pending with next_dispatch_at set and (ECS) one bounded spawn is requested; the scheduler retries every CFDB_WORKFLOW_RETRY_INTERVAL_S (+jitter) until success or CFDB_WORKFLOW_DISPATCH_DEADLINE_S (then failed capacity:). Removes the in-request _open_stream_with_retry / DISPATCH_WAIT_S wait.

Autonomous orphan recovery

lock.requeue_orphaned_dispatch + the scheduler's _recover_orphans (run at the top of every tick, including boot) re-queue jobs a crash strands that the normal lease cannot pick up — running rows whose consumer died and pending rows with no next_dispatch_at — once they pass CFDB_WORKFLOW_STALE_THRESHOLD_S. Gated on the same staleness check as claim_workflow so healthy heartbeating jobs are untouched; revives the row (partial-commit recovery reuses cached stages) rather than failing it, and makes recovery work for a surviving replica without an API restart or a client re-request.

Admission ceiling to 429

ensure_workflow counts active jobs and raises AdmissionRejected once CFDB_WORKFLOW_MAX_ACTIVE is reached; cache_stream.py maps it to 429 Retry-After. Readiness /status probes never dispatch, so never 429.

Supporting primitives, indexes, knobs, docs

lock.py (count_active_workflows, reschedule_dispatch, lease_due_dispatch, requeue_orphaned_dispatch) and models.py (next_dispatch_at, dispatch_attempts); a (status, next_dispatch_at) index in indexes.py + create-indexes.js; new env knobs in workflows/__init__.py (removing the superseded CFDB_WORKFLOW_DISPATCH_WAIT_S); README documents all four layers and adds 429 to the /data and /index status tables. main.py starts/stops the scheduler in the wool-pool context.

Test cases

# Test Suite Given When Then Coverage Target
1 TestTaskCountBackpressure A hook with threshold N active_task_count is below / at / above N Accepts below, rejects at and above Backpressure decision
2 TestTaskCountBackpressure A constructed hook It is cloudpickled round-trip Threshold and reject behavior survive the worker boundary Picklability
3 TestPriorityLoadBalancer Workers inserted out of uid order A task is dispatched The lowest-uid worker is offered first Stable priority order
4 TestPriorityLoadBalancer The priority worker rejects transiently / with RpcError A task is dispatched Rotates past the busy one / evicts the broken one Worker-health contract
5 TestPriorityLoadBalancer An empty or all-rejecting pool A task is dispatched Raises NoWorkersAvailable Overflow signal
6 TestWoolExecutorAdmission One active job and ceiling pinned to 1 ensure_workflow is awaited Raises AdmissionRejected before claiming Admission ceiling
7 TestWoolExecutorMarkRunningOnAcceptance A processor blocked before its first event A worker accepts the dispatch The job reaches running via the leading heartbeat Mark-running on acceptance
8 TestWoolExecutorWithProvisioner A worker accepts vs an overflowed attempt _attempt_dispatch runs Provisioner left idle on accept; requested once on overflow Spawn-on-overflow inversion
9 TestWoolExecutorWithProvisioner An overflow with a failing provisioner _attempt_dispatch runs Error swallowed, job stays pending and rescheduled Best-effort scale-up
10 TestWoolExecutorDispatchDeadline A queued job past its dispatch deadline _attempt_dispatch runs Failed with a capacity: prefix, no dispatch Retry-to-deadline bound
11 TestWoolExecutorScheduler A due / not-yet-due rescheduled job One scheduler tick runs Due job dispatched to completion; future job left pending Durable retry lease
12 TestWoolExecutorScheduler A scheduler whose lease query raises once The loop runs across ticks Survives the exception and ticks again Scheduler resilience
13 TestRequeueOrphanedDispatch Stale running / pending-no-next_dispatch_at vs fresh / queued / terminal rows The orphan sweep runs Only the stale non-leasable orphans are re-queued Orphan recovery gating
14 TestWoolExecutorScheduler A stale running orphan a crash left behind A scheduler tick (_recover_orphans then drain) runs The orphan is re-queued, dispatched, and reaches completed with no new request Autonomous restart recovery
15 TestServeWorkflowArtifactOrDispatch A cache miss + GET + executor raising AdmissionRejected The helper is awaited Raises HTTPException(429) with Retry-After Router 429 mapping
16 TestLeaseDueDispatch / TestRescheduleDispatch / TestCountActiveWorkflows A jobs collection with due/active rows The lock helpers run Lease oldest-first, reschedule PENDING-fenced, count active Lock primitives
17 test_lifespan_should_construct_worker_pool_with_quorum_zero The enabled-workflow lifespan Startup runs Pool built with PriorityLoadBalancer; scheduler started; drain cancels it Lifespan wiring

Introduce the configuration surface for issue #45's bounded-concurrency
control, parsed and validated at import alongside the existing workflow
runtime caps:

- CFDB_WORKER_MAX_CONCURRENT_TASKS (default 1) — per-worker backpressure
  threshold; 0 disables backpressure.
- CFDB_WORKFLOW_MAX_ACTIVE (default 1024) — admission ceiling on active
  workflows.
- CFDB_WORKFLOW_DISPATCH_DEADLINE_S (default 14400) — how long a job may
  wait for worker capacity before it is failed.
- CFDB_WORKFLOW_RETRY_INTERVAL_S (default 120) — base cadence for the
  durable dispatch-retry scheduler.

The threshold knob is consumed by the per-worker backpressure wiring; the
remaining three are consumed by the admission and durable-retry layers
that follow.
Workers ran with backpressure=None and accepted many concurrent
routines, so a single 1-vCPU/2-GiB Fargate worker oversubscribed the
samtools/bgzip/tabix/sort subprocess pipelines its routines shell out to.

Add TaskCountBackpressure, a picklable wool BackpressureLike hook that
rejects a dispatch once the worker already has
CFDB_WORKER_MAX_CONCURRENT_TASKS routines in flight (default 1, so each
worker serializes its pipeline). Rejection surfaces to the dispatcher as
gRPC RESOURCE_EXHAUSTED, which the load balancer treats as transient and
rotates past. Wire it into both worker entrypoints: worker_main builds the
hook directly, and worker_lan binds it onto the spawn factory via
functools.partial so the pool still prescribes the bind host. A threshold
of 0 passes backpressure=None, restoring the unbounded prior behavior.

The hook holds only an int and no async state so it survives wool's
serialization into the worker subprocess.
wool's default round-robin balancer spreads load evenly, so every worker
carries a thin perpetual slice of traffic and none drains to idle —
defeating worker self-reaping on an over-provisioned fleet.

Add PriorityLoadBalancer, a stateless wool LoadBalancerLike that offers
each task to the discovered workers in a stable order (sorted by uid) on
every dispatch. Combined with per-worker backpressure (one task each),
load fills the lowest-ordered workers first and higher-ordered ones drain
to idle and self-terminate on their max-lifetime — the leaky-bucket
behavior. It honors wool's worker-health contract: rotate to the next
worker on TransientRpcError (a backpressure RESOURCE_EXHAUSTED rejection),
evict on a non-transient RpcError, propagate anything else, and raise
NoWorkersAvailable when no worker accepts. Wire it into the API
WorkerPool.

Being stateless (no per-context index, no lock) keeps it trivially
picklable.
Extend the workflow job record and mutex layer for issue #45's durable
dispatch-retry and admission control, without changing dispatch behavior
yet (the executor consumes these next):

- JobRecord gains next_dispatch_at (when the retry scheduler should next
  attempt dispatch) and dispatch_attempts (deferral count). Both are
  serialized by to_mongo; the aware-datetime validator and
  _record_from_mongo treat next_dispatch_at as nullable UTC.
- lock.py gains count_active_workflows (admission count),
  reschedule_dispatch (defer a still-PENDING job, fenced on PENDING), and
  lease_due_dispatch (atomically claim one due PENDING job, pushing
  next_dispatch_at forward so concurrent scheduler ticks or API replicas
  cannot double-pick it).
@conradbzura conradbzura self-assigned this Jun 22, 2026
Address review findings on the foundational #45 primitives:

- lease_due_dispatch leases due jobs oldest-first (next_dispatch_at
  ascending) so sustained overflow cannot starve the longest-waiting job
  toward its deadline, and rejects a next_at that is not strictly in the
  future (an equal/past value would let a concurrent tick re-lease the
  same job before the attempt resolves, defeating the single-claim guard).
- reschedule_dispatch logs a debug line on a no-op fenced write, matching
  the observability convention of the module's other fenced updates.
- Correct the JobRecord.next_dispatch_at docstring: it is unset on claim,
  not set on claim.
- backpressure_for rejects a negative threshold rather than silently
  disabling backpressure, making its contract total.
lease_due_dispatch selects {status: pending, next_dispatch_at: {$lte:
now}} sorted by next_dispatch_at ascending, but the existing
status_updated_at index covers only the status prefix, so the
next_dispatch_at range/sort would scan all PENDING rows on every
scheduler tick. Add a (status, next_dispatch_at) index to
operational_index_specs and mirror it in create-indexes.js so the poll is
index-served. Adding it now, alongside the primitive that defines the
query, avoids an index migration on a hot, TTL-populated collection once
the scheduler goes live.
Pin the foundational #45 concurrency primitives flagged by review as
shipping untested:

- lease_due_dispatch: leases a due PENDING job and pushes next_dispatch_at
  forward; a second lease in the same window returns None (single-claim
  guard); skips unscheduled (None), not-yet-due, and non-PENDING jobs;
  leases oldest-due first; rejects a non-future next_at.
- reschedule_dispatch: defers a PENDING job and bumps dispatch_attempts;
  no-ops (fenced) on a non-PENDING row.
- count_active_workflows: counts pending+running, excludes terminal.
- JobRecord: next_dispatch_at/dispatch_attempts defaults, naive-datetime
  rejection, aware round-trip, and to_mongo serialization.

Extend the in-memory FakeCollection test double with $inc and a
single-key sort on find_one_and_update so these queries are exercisable
(both additive; existing tests use neither).
Address the test-coverage findings from review:

- PriorityLoadBalancer: pin cross-dispatch order stability (the
  leaky-bucket invariant, distinct from round-robin), continued in-order
  dispatch after evicting the first worker (the snapshot-iteration
  guarantee a 2-worker eviction did not exercise), and the str(uid)
  ordering key. Clarify the docstring that the uid order is
  arbitrary-but-stable, not a seniority ranking.
- backpressure_for: assert it rejects a negative threshold; give the
  protocol-conformance test teeth via the bool-return contract.
- Lifespan: assert the WorkerPool is constructed with a
  PriorityLoadBalancer (the headline wiring, previously uncovered).
- worker_lan: pin that the backpressure-bound spawn factory keeps wool's
  declares_host True, so the pool still prescribes the bind host.
…duler

Wire the dormant bounded-concurrency primitives into the executor's
request path so concurrency and cost become predictable and a dispatch
that finds no capacity queues durably instead of blocking a request.

ensure_workflow now enforces an admission ceiling: once the active
backlog (pending + running) reaches CFDB_WORKFLOW_MAX_ACTIVE it raises
AdmissionRejected before claiming the mutex, shedding a flood at the
door rather than queuing unbounded work.

A fresh claim dispatches its first attempt inline; each attempt offers
the task to existing workers once via the priority load balancer and
marks the job RUNNING only after a worker accepts, so a queued job stays
PENDING until it genuinely runs. On overflow the job is rescheduled
(staying PENDING, next_dispatch_at set) and, on the ECS profile, a single
best-effort worker spawn is requested -- inverting the old unconditional
pre-dispatch spawn. A durable, Mongo-backed scheduler re-attempts due
jobs on CFDB_WORKFLOW_RETRY_INTERVAL_S until a worker frees up or the
CFDB_WORKFLOW_DISPATCH_DEADLINE_S deadline elapses (then it fails the job
with a capacity: prefix). Because the queue lives in Mongo it survives an
API restart. A fresh claim leaves next_dispatch_at unset so the scheduler
cannot race the inline attempt.

This replaces the single in-request CFDB_WORKFLOW_DISPATCH_WAIT_S cold-
start wait, which is removed; the retry cadence plus deadline supersede
it.
Map the executor's AdmissionRejected to a 429 response with a Retry-After
header on the /data and /index dispatch path, caught explicitly before
WorkflowNotApplicable so a rejected request backs off rather than falling
through to direct upstream streaming and bypassing the bounded pipeline.
The side-effect-free /status readiness probes never dispatch, so they
never 429.

Start the durable retry scheduler in the lifespan, inside the wool pool
context so the task inherits wool's dispatch contextvars (the same
mechanism request-spawned tasks rely on). The existing drain teardown
cancels it before the pool closes.
…ntrol

Describe the four cooperating dispatch layers (per-worker backpressure,
priority load balancing, durable retry-to-deadline queue, admission
ceiling), add the new CFDB_WORKER_MAX_CONCURRENT_TASKS /
CFDB_WORKFLOW_MAX_ACTIVE / CFDB_WORKFLOW_RETRY_INTERVAL_S /
CFDB_WORKFLOW_DISPATCH_DEADLINE_S knobs, note that the removed
CFDB_WORKFLOW_DISPATCH_WAIT_S is superseded, and list 429 in the /data
and /index status-code tables.
…atch

The dispatch routine now yields an immediate heartbeat as its first event,
the instant a worker accepts the task and before the upstream download.
_attempt_dispatch marks the job RUNNING on that first event, so the job
leaves the leasable PENDING set the moment a worker takes it.

Previously the processor's first event arrived only after the download, so
a job stayed PENDING for the whole (potentially multi-second) download.
When that exceeded the retry interval the durable scheduler re-leased the
same job and dispatched a second concurrent attempt onto the same per-job
workdir, corrupting the in-flight download (observed live as "invalid
compressed data" and a source.raw.part rename race) and inflating apparent
worker concurrency. A rejected dispatch raises NoWorkersAvailable before
the routine body runs, so the leading heartbeat fires only on acceptance —
overflowed jobs are never marked RUNNING, keeping concurrency bounded to
the worker count.
Add a scheduler orphan-recovery sweep so restart recovery no longer depends
on a client re-requesting the same file.

The durable scheduler only leases PENDING jobs whose next_dispatch_at is set
and due, so two states a crash leaves behind would otherwise sit until
claim_workflow's stale-reclaim fired on a fresh same-file request: a RUNNING
row whose API consumer died mid-stream, and a fresh PENDING claim whose
inline attempt never rescheduled. requeue_orphaned_dispatch resets both to
PENDING with next_dispatch_at=now once they pass the stale threshold, and
the scheduler runs it at the top of every tick — including the first, on
boot — so the next tick re-dispatches them.

The sweep is gated on the same updated_at staleness threshold as
stale-reclaim, so a healthy in-flight job (which heartbeats) is never
yanked from its worker; partial-commit recovery means a re-dispatched job
reuses any stages already cached. Unlike stale-reclaim, which fails the row
so a new claimant supersedes it, the sweep revives the same row since there
is no new claimant.

Also fix the FakeCollection.update_many test double to report a row-level
modified_count (matching real Mongo) instead of a per-field-write count.
Address the principal-engineer review of the bounded-concurrency change
(PR #63, round 2):

- B1: make a stale RUNNING row reliably mean a dead consumer. The stream
  consumer aborts (HeartbeatLost) once its heartbeat writes have been
  failing longer than _HEARTBEAT_LOSS_ABORT_S — sized strictly between one
  heartbeat interval and the orphan sweep's stale threshold — so a
  Mongo-blind consumer stops computing (aclose cancels the worker) before
  recovery would re-dispatch it, eliminating the duplicate run rather than
  merely making it non-corrupting. Each dispatch attempt also uses a
  per-attempt workdir (job_id + nonce) so the pathological residual can't
  corrupt a sibling attempt's scratch.
- A1: clean up the per-attempt workdir on the mark_running hand-off branch
  (without releasing the successor's mutex), with a regression test.
- A3: cap leases per scheduler tick (_MAX_DISPATCHES_PER_TICK) so a backlog
  can't spawn ~MAX_ACTIVE concurrent attempts in a single tick.
- A6: pass heartbeat_interval explicitly to the routine so the value is
  resolved API-side at dispatch rather than snapshotted by cloudpickle.
- A9/A12: correct the stale WoolExecutor docstring and comments.
- A2/A4: document that the ceiling sheds attach re-GETs and that the
  dispatch deadline runs from submission and is not reset on recovery.
…eview

- A8: register the unit jobs mutex index with the production
  {active: true} partialFilterExpression (not the DocumentDB-rejected
  status-$in form), so the unit layer guards active/status lockstep drift
  the way production does.
- A15: make the FakeCollection matcher AND $and/$or with sibling top-level
  keys instead of short-circuiting on them, so a mixed query (e.g. the
  orphan-recovery filter) is modeled regardless of key order.
- A7: derive the create-indexes.js parity assertion from
  operational_index_specs() so a newly-added operational index is guarded
  automatically (now covers status_next_dispatch_at).
- A11: rename executor/lock tests to match the method __name__ convention.
A2: note that at the ceiling a re-GET for an already-active file is also
shed with 429 (it does not attach). A18: clarify the /data and /index 202
rows that the accepted job may sit pending in the durable queue under load.
Tighten the stale-reclaim invariant to a strict STALE > 2 * HEARTBEAT so
the heartbeat-loss abort window (_HEARTBEAT_LOSS_ABORT_S = max(HEARTBEAT,
STALE - HEARTBEAT)) keeps a non-zero margin above one interval; at the
former permitted boundary STALE == 2 * HEARTBEAT it collapsed to exactly
one interval, defeating the single-transient-failure tolerance the design
advertises.

Count the admission ceiling on the canonical active boolean discriminator
rather than the derived status-in-ACTIVE_STATUSES view, so the ceiling and
the mutex stay in lockstep if the two ever drift.

Emit leading operability signals: a per-tick dispatch summary and an
overflow-reschedule line, so a saturated pool is visible before jobs start
failing capacity: at the deadline.

Correct stale or overstated docstrings and comments: the HeartbeatLost
abort is documented as a best-effort optimization (it only fires on quiet
stages), with the per-attempt workdir named as the actual double-dispatch
guard; the _next_dispatch_time jitter note no longer claims a test zeroes
the constant; the load balancer dispatch return type is an async iterator,
not an async generator; the worker_lan module docstring describes the
durable-queue model instead of the removed hang-then-NoWorkersAvailable
behavior; _recover_orphans and the JobRecord.error field document the
from-submission deadline clock and the internal: error prefix.
Replace the vacuous workdir-cleanup assertions in the executor-boundary
and processor e2e tests: since the per-attempt workdir is named
job_id-uuid, a bare-job_id path is never created, so asserting its absence
passed even if scratch leaked. Assert the workdir root is empty instead.

Pin the per-tick lease cap: one _drain_due_jobs tick leases at most
_MAX_DISPATCHES_PER_TICK jobs and leaves the remainder leasable, so a
refactor dropping the thundering-herd guard fails CI.

Pin the production pool configuration over the real cloudpickle/gRPC
boundary: a two-worker pool wired with PriorityLoadBalancer and a
TaskCountBackpressure(1)-bound spawn factory distributes two workflows
across both workers and leaves the third PENDING with a next_dispatch_at,
proving rotation-on-rejection and durable reschedule end to end.

Pin the ECS worker entrypoint's backpressure wiring (worker_main.serve)
and add Hypothesis property coverage for the JobRecord datetime
aware/naive to_mongo/from_mongo round-trip (now including next_dispatch_at)
and the cloudpickle round-trip of the backpressure-bound spawn factory.

Strengthen the heartbeat-loss threshold assertion to the strict margin the
tightened invariant now guarantees, rename the scheduler-method tests to
the leading-underscore convention, and refresh the stale wool_pool fixture
docstring.
Autonomous orphan recovery preserves the original submission time, so an
orphan older than the dispatch deadline is failed capacity: on its first
recovery attempt rather than resumed. State this in the durable-queue
bullet so the autonomous-recovery promise is not read as unbounded;
committed cache artifacts still survive for a later fresh GET to reuse.
@conradbzura conradbzura marked this pull request as ready for review June 23, 2026 16:09
@conradbzura conradbzura merged commit 1d22299 into master Jun 23, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add bounded concurrency control for the preprocessing/indexing workflow pipeline

1 participant