From 9425026eeaaaf086d3f36b8b23012cda8607867d Mon Sep 17 00:00:00 2001 From: "pinnamaraju.swaroop" Date: Tue, 21 Apr 2026 15:35:46 +0530 Subject: [PATCH] docs: event-driven lead dispatch design and endpoint reference fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a design document set under docs/event-driven-dispatch/ describing the proposed Redis-backed event-driven replacement for the cron-based backlog picker. Covers motivation, architecture (ingest, delayed ZSET, leader-elected promoter, sharded workers, channel semaphore, control plane reconcilers), data model, failure modes and recovery, phased dual-write migration, and operations runbooks. Also corrects stale /push/lead/v2 references in CLAUDE.md and BREEZE_BUDDY_ARCHITECTURE.md — the current endpoint is POST /leads, handled in app/api/routers/breeze_buddy/leads/. No code changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 2 +- docs/BREEZE_BUDDY_ARCHITECTURE.md | 27 +- docs/event-driven-dispatch/01-motivation.md | 84 +++++ docs/event-driven-dispatch/02-architecture.md | 296 ++++++++++++++++++ docs/event-driven-dispatch/03-data-model.md | 199 ++++++++++++ docs/event-driven-dispatch/04-reliability.md | 213 +++++++++++++ docs/event-driven-dispatch/05-migration.md | 157 ++++++++++ docs/event-driven-dispatch/06-operations.md | 235 ++++++++++++++ docs/event-driven-dispatch/README.md | 43 +++ 9 files changed, 1243 insertions(+), 13 deletions(-) create mode 100644 docs/event-driven-dispatch/01-motivation.md create mode 100644 docs/event-driven-dispatch/02-architecture.md create mode 100644 docs/event-driven-dispatch/03-data-model.md create mode 100644 docs/event-driven-dispatch/04-reliability.md create mode 100644 docs/event-driven-dispatch/05-migration.md create mode 100644 docs/event-driven-dispatch/06-operations.md create mode 100644 docs/event-driven-dispatch/README.md diff --git a/CLAUDE.md b/CLAUDE.md index 19dfc6843..c7ac5b509 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -104,7 +104,7 @@ Breeze Buddy is the template-driven telephony agent. These patterns MUST be foll - Template types defined in `breeze_buddy/template/types.py` -- this is the source of truth for all template models ### Lead Processing Flow -1. Lead inserted via `/push/lead/v2` -> validated -> stored as BACKLOG +1. Lead inserted via `POST /leads` -> validated -> stored as BACKLOG 2. Cron job picks up backlog leads 3. Pre-checks run (optional external API validation) 4. Call initiated via telephony provider (Twilio/Plivo/Exotel) diff --git a/docs/BREEZE_BUDDY_ARCHITECTURE.md b/docs/BREEZE_BUDDY_ARCHITECTURE.md index a6b1de502..adfb78a49 100644 --- a/docs/BREEZE_BUDDY_ARCHITECTURE.md +++ b/docs/BREEZE_BUDDY_ARCHITECTURE.md @@ -38,7 +38,7 @@ Breeze Buddy is a template-based voice agent system built on top of Pipecat for ┌─────────────────────────────────────────────────────────────────────────┐ │ API Layer (FastAPI) │ ├─────────────────────────────────────────────────────────────────────────┤ -│ /push/lead/v2 │ /template (GET/POST) │ /websocket │ +│ POST /leads │ /template (GET/POST) │ /websocket │ │ - Lead insertion │ - Template CRUD │ - Call handling │ │ - Payload validation │ - Schema validation │ - WebSocket connect │ └────────────┬───────────────────────┬──────────────────────┬─────────────┘ @@ -71,7 +71,7 @@ Breeze Buddy is a template-based voice agent system built on top of Pipecat for ### 1. Lead Insertion Flow -Leads are inserted through the `/push/lead/v2` endpoint: +Leads are inserted through the `POST /leads` endpoint: **Request Model** ([types/models.py:15-20](app/ai/voice/agents/breeze_buddy/types/models.py#L15-L20)): ```python @@ -83,7 +83,7 @@ class PushLeadRequest(BaseModel): reporting_webhook_url: str | None = None # Callback URL ``` -**Insertion Process** ([leads.py:153-275](app/api/routers/breeze_buddy/leads.py#L153-L275)): +**Insertion Process** ([handlers.py:97](app/api/routers/breeze_buddy/leads/handlers.py#L97)): 1. **Template Retrieval**: Fetch template by reseller, identifier, and name ```python @@ -604,12 +604,12 @@ async def my_handler(context, flow_manager, args): ### API Layer #### 1. Leads Router -**Location**: [api/routers/breeze_buddy/leads.py](app/api/routers/breeze_buddy/leads.py) +**Location**: [api/routers/breeze_buddy/leads/](app/api/routers/breeze_buddy/leads/) **Endpoints**: -##### `POST /push/lead/v2` -Inserts new lead for processing ([leads.py:153-275](app/api/routers/breeze_buddy/leads.py#L153-L275)) +##### `POST /leads` +Inserts new lead for processing ([handlers.py:97](app/api/routers/breeze_buddy/leads/handlers.py#L97)) **Request**: ```json @@ -642,11 +642,14 @@ Inserts new lead for processing ([leads.py:153-275](app/api/routers/breeze_buddy - Checks template exists for reseller - Verifies call execution config exists -##### `GET /lead/{lead_id}` -Retrieves lead by ID ([leads.py:25-55](app/api/routers/breeze_buddy/leads.py#L25-L55)) +##### `GET /leads/{lead_id}` +Retrieves lead by ID ([__init__.py:99](app/api/routers/breeze_buddy/leads/__init__.py#L99)) -##### `POST /{reseller}/{template}` -Legacy endpoint for order confirmation ([leads.py:58-150](app/api/routers/breeze_buddy/leads.py#L58-L150)) +##### `DELETE /leads/{lead_id}` +Aborts a lead by ID ([__init__.py](app/api/routers/breeze_buddy/leads/__init__.py)) + +##### `GET /leads/recording/{call_sid}` +Streams the call recording audio ([__init__.py](app/api/routers/breeze_buddy/leads/__init__.py)) #### 2. Template Router **Location**: [api/routers/breeze_buddy/template.py](app/api/routers/breeze_buddy/template.py) @@ -804,7 +807,7 @@ Plays audio (e.g., dial tone, hold music). ``` 1. Lead Insertion - ├─> POST /push/lead/v2 + ├─> POST /leads ├─> Validate payload against template schema ├─> Get call execution config └─> Insert into lead_call_tracker table @@ -1163,7 +1166,7 @@ Have a good day." ### Payload Example -**Input Payload** (from `/push/lead/v2`): +**Input Payload** (from `POST /leads`): ```json { "payload": { diff --git a/docs/event-driven-dispatch/01-motivation.md b/docs/event-driven-dispatch/01-motivation.md new file mode 100644 index 000000000..d5892afb3 --- /dev/null +++ b/docs/event-driven-dispatch/01-motivation.md @@ -0,0 +1,84 @@ +# 01 — Motivation + +## What the current system does + +1. `POST /leads` (`app/api/routers/breeze_buddy/leads/__init__.py:42`) inserts a row into `lead_call_tracker` with `status=BACKLOG` and `next_attempt_at = now() + initial_offset`. +2. An external cron pings `/cron/initiate` (`app/api/routers/breeze_buddy/cron.py:16`) every 30 seconds. +3. That handler calls `process_backlog_leads` (`app/ai/voice/agents/breeze_buddy/managers/calls.py:439`), which runs: + + ```sql + SELECT * FROM lead_call_tracker + WHERE status = 'BACKLOG' + AND next_attempt_at <= NOW() + AND is_locked = FALSE + AND execution_mode IN ('TELEPHONY', 'TELEPHONY_TEST'); + ``` + +4. It iterates the result set. For each row it tries to atomically acquire a per-row lock (`is_locked=TRUE` with `expected_status=BACKLOG`), runs pre-checks, acquires a telephony number (atomically incrementing `outbound_number.channels`), and calls `provider.make_call(...)`. On failure it may create a new BACKLOG row with `attempt_count+1`. + +## The concrete failure mode + +Consider a volume day: 500 eligible leads at t=0, serial per-lead processing takes ~350ms each (pre-checks + provider call setup), so the full batch takes ~3 minutes. + +``` +t=0s cron fires -> SELECT returns 500 rows -> iterating starts +t=30s cron fires -> SELECT returns ~450 rows (50 now PROCESSING or locked) + -> skips all locked rows, but the scan still happened +t=60s cron fires -> SELECT returns ~400 rows -> same scan, same skip +t=90s cron fires -> SELECT returns ~350 rows -> same scan, same skip +t=120s cron fires -> SELECT returns ~300 rows -> same scan, same skip +t=150s cron fires -> SELECT returns ~250 rows -> same scan, same skip +t=180s original batch finishes; new leads have arrived in the meantime +``` + +**Six full-table index scans have happened during a single batch.** Each scan walks the `(status, next_attempt_at)` index across every BACKLOG row. The scans themselves aren't wasted (they correctly skip locked rows), but they are not free: + +- Index walks and row visibility checks on an ever-growing table. +- Postgres lock contention rises as every scan re-visits the same rows. +- `pg_stat_activity` shows multiple long-running transactions from the overlapping cron handlers. +- If multiple pods run the cron, the serialization is worse — one pod wins the Redis lock; the other pods' HTTP timeouts cascade. +- As ingestion outpaces processing, the scan set grows, each scan takes longer, processing falls further behind. Unbounded. + +This is the textbook polling-at-the-limit problem. The cron is both the clock and the dispatcher; as the dispatcher slows, the clock keeps ticking, and the scans pile up. + +## Secondary pain points + +### Latency floor is 30s, ceiling is unbounded + +A lead scheduled for `t=T+1s` waits up to 29s for the next tick, then queues behind whatever the batch is chewing on. SLA-sensitive campaigns (order-confirmation, intent-to-buy follow-up) suffer visibly when the backlog grows. + +### Rate-limited leads re-picked forever + +Today, when a customer hits the rate-limit in `call_limiter.py`, the lead stays BACKLOG. Every subsequent cron tick picks it up, fails again, alerts again. The code explicitly acknowledges this (TODO at `calls.py:584`). + +### All-or-nothing batch + +`process_backlog_leads` has no batch cap. It pulls every eligible row and iterates serially. A one-time surge becomes a request-timing-out HTTP call and a 5-minute outage window while the first batch drains. + +### Channel-capacity logic is DB-coupled + +Each call does an atomic `increment_outbound_number_channels` at dispatch time and a corresponding decrement on call-end. This is correct, but it means channel-capacity enforcement is not decoupled from the dispatcher — there is no way to "wait for a channel" other than to pick the lead, fail to acquire, and try again on the next tick. + +### Horizontal scale is a lie + +Running more pods does not make the cron faster. The scheduler's Redis lock means exactly one pod runs `process_backlog_leads` per tick. Adding pods adds idle capacity, not throughput. + +### Retries don't know about recent attempts + +Because retries are new BACKLOG rows, the only knobs are `next_attempt_at` and `attempt_count`. There is no per-customer or per-number throttle on the retry path — if a number is currently failing on every call, retries blindly pile back into the same number. + +## Requirements for the replacement + +Derived directly from the problems above: + +1. **No hot-path polling.** Lead dispatch must not do a full-table or even index-bounded SELECT on every tick. +2. **Sub-second dispatch latency** from `next_attempt_at` to provider `make_call`. +3. **The telephony channel count is the only throughput limiter.** Not cron frequency, not worker count, not DB contention. +4. **Horizontally scalable.** Adding pods must add throughput up to the channel ceiling. +5. **Rate-limited leads must not busy-loop** — they must be rescheduled to a time when they can actually run. +6. **Bounded per-tick work.** A surge of 100k leads must not break the system; it must drain at channel capacity. +7. **Correct under failure.** Redis crashes, worker crashes, provider webhook losses must not create stuck leads or duplicate calls. +8. **DB remains source of truth.** Redis is a cache + dispatch fabric, never authoritative for lead state. +9. **Migration must be non-disruptive.** No flag day. Per-reseller cutover with rollback. + +Every design decision in the following documents maps back to one of these nine requirements. diff --git a/docs/event-driven-dispatch/02-architecture.md b/docs/event-driven-dispatch/02-architecture.md new file mode 100644 index 000000000..157441965 --- /dev/null +++ b/docs/event-driven-dispatch/02-architecture.md @@ -0,0 +1,296 @@ +# 02 — Architecture + +The system is split into five planes. Each plane has a single responsibility; failures in one plane are contained by the others. + +``` + ┌──────────────────────────────────────┐ + │ PLANE 1 - INGEST (HTTP-facing) │ + client ─────► POST /leads ──────► insert into lead_call_tracker │ + │ ZADD schedule:leads │ + │ return 201 │ + └────────────────┬─────────────────────┘ + │ + ▼ + ┌──────────────────────────────────────┐ + │ PLANE 2 - TIME SCHEDULING │ + │ Redis ZSET: schedule:leads │ + │ score = next_attempt_at_unix_ms │ + │ member = lead_id │ + └────────────────┬─────────────────────┘ + │ + ▼ + ┌──────────────────────────────────────┐ + │ PLANE 3 - PROMOTION │ + │ Leader-elected promoter │ + │ every 200ms: │ + │ ZRANGEBYSCORE 0 now LIMIT 0 500 │ + │ for each id: │ + │ ZREM schedule:leads id │ + │ LPUSH ready:leads:{shard} │ + └────────────────┬─────────────────────┘ + │ + ▼ + ┌────────────────────────────────────────┴────────────────────────────────────────┐ + │ PLANE 4 - DISPATCH │ + │ │ + │ ready:leads:0 ready:leads:1 ... ready:leads:15 │ + │ │ │ │ │ + │ ▼ ▼ ▼ │ + │ workers[shard 0] workers[shard 1] ... workers[shard 15] │ + │ │ │ + │ ▼ │ + │ BLPOP lead_id │ + │ row lock (existing) │ + │ pre-checks │ + │ pick outbound number │ + │ BLPOP channel:{number_id} ◄── capacity gate │ + │ provider.make_call(...) │ + │ UPDATE status -> PROCESSING │ + │ │ + └─────────────────────────────────────────┬───────────────────────────────────────┘ + │ + webhook ──►│── LPUSH channel:{number_id} token + │ + ▼ + ┌──────────────────────────────────────┐ + │ PLANE 5 - CONTROL PLANE │ + │ (existing BackgroundTaskScheduler) │ + │ │ + │ reconcile_backlog_to_zset 60s │ + │ reap_stuck_processing_lists 30s │ + │ reconcile_channel_tokens 60s │ + │ clean_stale_bb_locks 300s │ + └──────────────────────────────────────┘ +``` + +## Plane 1 — Ingest + +**Responsibility:** accept lead pushes and make them visible to the scheduler. + +**Touchpoint:** `POST /leads` (`app/api/routers/breeze_buddy/leads/__init__.py:42`). + +**Behavior:** + +1. Validate payload against template schema (unchanged). +2. Insert into `lead_call_tracker` with `status=BACKLOG`. +3. Emit the scheduling event: `ZADD schedule:leads `. +4. Return 201 with the tracker id. + +Steps 2 and 3 are **not** atomic. If the pod dies between step 2 and step 3, the lead is in BACKLOG without an event. The boot-time and periodic reconciler (Plane 5) will detect this and emit the missing event within 60 seconds. This is an acceptable SLA for the rare crash window; see [04-reliability.md](04-reliability.md). + +Reasons we don't use an outbox pattern: +- It would add a second DB write and a dedicated relay process. +- The reconciler already provides at-least-once delivery. +- Duplicate enqueue is harmless (workers are idempotent via the existing `acquire_lock_on_lead_by_id` atomic UPDATE). + +## Plane 2 — Time Scheduling + +**Responsibility:** hold all scheduled leads in time order. + +**Structure:** one Redis sorted set. + +| | | +|---|---| +| key | `schedule:leads` | +| type | ZSET | +| score | `next_attempt_at` as Unix milliseconds (double) | +| member | lead id (string) | + +**Why ZSET and not alternatives:** + +- **Redis Streams** have no native delayed-delivery. Implementing delay on a Stream means time-bucketed streams, a scanner that rotates buckets, and mid-second precision workarounds. That is a worse ZSET. +- **Keyspace notifications on TTL'd keys** are lossy. Subscribers that disconnect lose events. The docs explicitly state keyspace notifications are best-effort. Not acceptable for money-adjacent dispatch. +- **External libraries (arq, rq-scheduler)** add a framework, a second worker model, and a dependency. We already have asyncpg + Redis; we don't need more. +- **ZSET** gives O(log N) insert, O(log N + K) range pop, trivial re-scheduling (same `ZADD` overwrites), and natural ordering. It's the right primitive. + +**Sizing:** the ZSET contains only leads whose firing time has not yet passed. Its steady-state size is bounded by *ingestion rate × average scheduling delay*, not by cumulative lead history. If you ingest 1000 leads/minute with an average 5-minute initial offset, steady state is ~5000 members. Trivial for Redis. + +**Never shard this ZSET.** It is the global time view. Sharding it would require a promoter per shard, which multiplies leader-election complexity without solving a real problem. Redis handles millions of ZSET members without breaking a sweat. + +## Plane 3 — Promotion + +**Responsibility:** move leads from "scheduled for later" to "ready to dispatch" at the right moment. + +**Implementation:** a single leader-elected async task that runs in every pod. Only the leader acts. + +**Leader election:** Redlock pattern on `promoter:leader` with 5s TTL, renewed every 2s. Standard algorithm. On leader death, another pod takes over within ~5s. + +**Tick loop (leader only):** + +``` +every PROMOTER_TICK_MS (default 200): + now = current_unix_ms() + ids = ZRANGEBYSCORE schedule:leads 0 now LIMIT 0 PROMOTER_BATCH (default 500) + if ids is empty: + continue + # Single Lua script for the move: + for id in ids: + if ZREM schedule:leads id == 1: + LPUSH ready:leads:{hash(id) % SHARD_COUNT} id +``` + +**Why Lua for the move:** not for cross-command atomicity (not needed — the `ZREM` result is the guard against double-dispatch), but for network efficiency. Moving 500 leads in one `EVAL` cuts 500 RTTs to 1. At promoter tick rates, this is the difference between a cheap promoter and a Redis CPU hot-spot. + +**Why 200ms:** fast enough for sub-second dispatch SLA, slow enough to not burn CPU when idle. Tunable. + +**Why a bounded batch (500):** caps work per tick. If 100k leads become due simultaneously (e.g. after a DB maintenance window), the promoter drains them at `500 × 5/sec = 2500/sec`, not in a single blocking scan. + +**Why leader-elected and not per-pod:** the promoter's only job is `ZREM + LPUSH`. Running it on multiple pods just means redundant `ZRANGEBYSCORE` calls that find nothing (because the leader already `ZREM`'d). Single-leader is simpler and cheaper. + +**Sharding into `ready:leads:{shard}`:** + +- `SHARD_COUNT` is a small constant (default 16). Fixed at deploy time. +- Shard key = `hash(reseller_id) % SHARD_COUNT` (lives on the lead row, resolved via the promoter's in-memory cache). +- Per-tenant isolation: one noisy reseller fills its own shards; other shards flow freely. +- Number of workers per shard is configurable. High-volume resellers get more workers on their shards. + +## Plane 4 — Dispatch + +**Responsibility:** pick a lead, acquire a channel, make the call. + +Every pod runs K async worker tasks per shard it handles. Workers are long-lived asyncio tasks, not processes. + +**Worker loop:** + +``` +while running: + # 1. Receive an event - blocking, not polling + lead_id = BLPOP ready:leads:{my_shard} timeout=30s + if no lead_id: continue # cycle for shutdown signal + + # 2. Move to reliability list for crash recovery + RPUSH processing:leads:{worker_uuid} lead_id + + try: + # 3. Load authoritative state from DB + lead = SELECT ... FROM lead_call_tracker WHERE id = lead_id + if lead.status != BACKLOG: + continue # already handled (e.g. cancelled) + + # 4. Acquire row lock (existing atomic UPDATE) + if not acquire_lock_on_lead_by_id(lead_id, expected_status=BACKLOG): + continue # someone else owns it + + # 5. Pre-checks (unchanged) + if not pass_pre_checks(lead): + finalize_with_precheck_failure(lead) + continue + + # 6. Calling-hours guard + if outside_calling_hours(lead): + ZADD schedule:leads lead_id + release_row_lock(lead) + continue + + # 7. Pick outbound number + number = pick_outbound_number(lead) + if not number: + ZADD schedule:leads lead_id + release_row_lock(lead) + continue + + # 8. Capacity gate + token = BLPOP channel:{number.id} timeout=10s + if not token: + # All channels on this number are busy right now + ZADD schedule:leads lead_id + release_row_lock(lead) + continue + + # 9. Per-customer rate limit (existing sliding window) + if rate_limited(lead.phone): + LPUSH channel:{number.id} token + ZADD schedule:leads lead_id + release_row_lock(lead) + continue + + # 10. Make the call + try: + call_sid = provider.make_call(number, lead) + UPDATE lead_call_tracker + SET status=PROCESSING, call_id=call_sid + WHERE id=lead_id AND status=BACKLOG + # token stays held - released by call-end webhook + except Exception: + LPUSH channel:{number.id} token + ZADD schedule:leads lead_id + + finally: + LREM processing:leads:{worker_uuid} 1 lead_id +``` + +### The channel semaphore + +Each `outbound_number` row with `maximum_channels = M` has a Redis list: + +| | | +|---|---| +| key | `channel:{outbound_number_id}` | +| type | LIST | +| length at idle | M | +| member | opaque token string (generated at startup, content does not matter) | + +**On startup:** for each active outbound number, `DEL channel:{id}` then `RPUSH channel:{id} token1 token2 ... tokenM`. + +**On dispatch:** worker does `BLPOP` — this blocks until a token is available or timeout. + +**On call end** (telephony callback handler at `app/api/routers/breeze_buddy/telephony/callbacks/`): `LPUSH channel:{id} token`. + +**Why not use `outbound_number.channels` DB counter directly:** + +- A `BLPOP` is O(1) and blocks cheaply. A DB counter needs `SELECT ... FOR UPDATE` (row lock) or optimistic `UPDATE ... WHERE channels < maximum_channels` (then retry on zero rows). Both are slower and create DB contention. +- The worker already needs to decide "wait or skip" on capacity. `BLPOP` with a timeout gives that choice for free. +- The DB counter doesn't disappear — it stays as the eventually-consistent view for operators and for the reconciler that detects token leaks. But the *hot path* uses Redis. + +This is the single most important design decision in the system. Re-read this section if nothing else. + +### Worker scaling + +Total concurrent call attempts in the system = sum of workers waiting at line 8 (`BLPOP channel:...`). This is naturally bounded by `sum(M across active numbers)`. Running more workers doesn't cause more calls than channels allow — excess workers just sit in `BLPOP`. + +This is why the earlier requirement "telephony channel count is the only throughput limiter" holds: workers can be oversubscribed freely. + +## Plane 5 — Control Plane + +**Responsibility:** heal Redis-vs-DB drift, clean up stuck state, emit metrics. + +**Implementation:** the existing `BackgroundTaskScheduler` (`app/core/background_tasks/scheduler.py`). This framework was purpose-built for slow, single-pod, periodic chores. It fits the control plane exactly. + +Registered tasks: + +| Task | Interval | Purpose | +|---|---|---| +| `reconcile_backlog_to_zset` | 60s | Scan BACKLOG rows not present in `schedule:leads`; re-enqueue them. Heals lost ingest events and worker crashes before `ZADD`-on-retry. | +| `reap_stuck_processing_lists` | 30s | Scan `processing:leads:*` for entries held > 5 min. Re-ZADD to scheduler with `now`. Checks DB state first to avoid re-dispatching a lead that's already PROCESSING. | +| `reconcile_channel_tokens` | 60s | For each active outbound number, compute `M - in_flight_calls_from_db` vs `LLEN channel:{id}`. Top up leaked tokens. | +| `clean_stale_bb_locks` | 300s | Free rows with `is_locked=TRUE` and `locked_at > 10 min`. Last-line defense against stuck locks. | + +**What is NOT in the scheduler:** + +- The promoter. Its 200ms tick is far below the scheduler's 60s minimum loop interval. Promoter runs as a dedicated asyncio task on startup. +- The workers. They are always-on consumers, not periodic tasks. +- Channel semaphore initialization. Runs once on startup, not periodically. + +See [04-reliability.md](04-reliability.md) for the exact SQL of each reconciler and the failure case each one covers. + +## Component placement in the codebase + +Proposed layout (for implementation reference): + +``` +app/ai/voice/agents/breeze_buddy/dispatch/ + __init__.py + queue.py # ZADD / ZRANGEBYSCORE / ZREM helpers + promoter.py # leader election + tick loop + worker.py # BLPOP -> dispatch flow + channel_semaphore.py # token bucket per outbound number + reconcilers.py # functions registered on BackgroundTaskScheduler + shards.py # shard resolution helpers +``` + +Touchpoints in existing code: + +- `app/api/routers/breeze_buddy/leads/handlers.py:97` (`push_lead_handler`) — add ZADD after insert. +- `app/api/routers/breeze_buddy/telephony/callbacks/` — release channel token on call-end. +- `app/main.py` lifespan — start promoter task, initialize channel semaphores, register control-plane tasks on the scheduler. +- `app/ai/voice/agents/breeze_buddy/managers/calls.py:439` (`process_backlog_leads`) — kept during migration, deleted at the end. diff --git a/docs/event-driven-dispatch/03-data-model.md b/docs/event-driven-dispatch/03-data-model.md new file mode 100644 index 000000000..f18b008bf --- /dev/null +++ b/docs/event-driven-dispatch/03-data-model.md @@ -0,0 +1,199 @@ +# 03 — Data Model + +## Redis keys + +All keys use the prefix `bb:` (breeze buddy) to namespace away from other Redis uses in the app. + +| Key | Type | Purpose | Produced by | Consumed by | Eviction | +|---|---|---|---|---|---| +| `bb:schedule:leads` | ZSET | Delayed queue of leads awaiting dispatch. Score = `next_attempt_at` in Unix ms. Member = lead id. | Ingest (`POST /leads`), Worker retry, Reconciler | Promoter | Never (members removed by `ZREM` on promotion or `ZREM` on cancel) | +| `bb:ready:leads:{shard}` | LIST | Ready-to-dispatch leads for a shard. FIFO. | Promoter | Worker `BLPOP` | Never (drained by workers) | +| `bb:processing:leads:{worker_uuid}` | LIST | In-flight leads held by a single worker for crash recovery. | Worker on pick | Worker on finish, Reaper on stuck | Never (normal drain + reaper) | +| `bb:channel:{outbound_number_id}` | LIST | Channel-capacity semaphore. Each member is a token; `LLEN` == available channels. | Startup init, Call-end webhook, Error paths | Worker `BLPOP` before `make_call` | Never (rebuilt from DB by reconciler) | +| `bb:promoter:leader` | STRING | Leader-election lock. Value = pod instance id. TTL 5s, renewed every 2s. | Candidate promoters via `SET NX EX` | Leader itself for heartbeat | TTL | +| `bb:promoter:paused` | STRING | Emergency stop flag. If exists, promoter skips its tick. | Operators (runbook) | Promoter | Manual | +| `bb:reseller:paused:{reseller_id}` | STRING | Per-reseller pause. | Operators | Workers (skip + re-ZADD) | Manual | + +Existing keys (referenced for completeness, unchanged): + +| Key | Notes | +|---|---| +| `greeting:{lead_id}` | Pre-computed greeting audio. Untouched. | +| `breeze_buddy:outbound_rate_limit:{token}` | Per-customer sliding window. Used by workers before `make_call`. | +| `background:task:{name}:lock` | Existing `BackgroundTaskScheduler` locks. Used for control-plane tasks. | + +## DB schema changes + +Minimal. The `lead_call_tracker` table already has everything the new system needs (`id`, `status`, `next_attempt_at`, `is_locked`, `attempt_count`, `reseller_id`). No new columns required for dispatch itself. + +**Possible additions** (optional, for observability and for better reconciler queries): + +| Column | Type | Purpose | +|---|---|---| +| `lead_call_tracker.dispatched_at` | TIMESTAMP NULL | Set when worker picks the lead off the ready list. Lets us measure event-path latency end-to-end. | +| `lead_call_tracker.shard` | SMALLINT NULL | Materialized shard assignment. Avoids repeated `hash(reseller_id)` computation and makes shard re-balancing observable. | + +If added, do it as a new sequential migration (e.g. `026_event_dispatch_columns.sql`) per the project's migration rule. + +**No new tables.** The ZSET, ready lists, and processing lists live entirely in Redis. + +## Lead state machine + +Existing states (`app/schemas/breeze_buddy/core.py:33`) are sufficient. The transitions change. + +``` + ┌──────────────────────────────┐ + │ BACKLOG │ + │ (row exists, may or may not │ + │ be in schedule:leads ZSET) │ + └───────┬────────────────┬─────┘ + │ │ + promoter │ │ cancel + + worker │ │ + pick │ │ + ▼ ▼ + ┌──────────────┐ ┌───────────┐ + │ PROCESSING │ │ FINISHED │ + │ (call_id │ │ (CANCELLED)│ + │ set) │ └───────────┘ + └───────┬──────┘ + │ + webhook │ + or timeout │ + │ + ┌───────▼──────┐ + │ FINISHED │ ── if attempts remaining ──► new BACKLOG row + ZADD + │ (terminal │ (existing retry pattern) + │ outcome) │ + └──────────────┘ +``` + +Key invariants: + +1. **BACKLOG rows may or may not have a corresponding ZSET entry.** The reconciler ensures eventual consistency. +2. **PROCESSING rows are never in the ZSET.** The worker does `ZREM`-implicit (by consuming from the ready list) before transitioning. +3. **FINISHED is terminal for that row.** Retries create new rows. +4. **`is_locked=TRUE` always implies a worker holds it.** Reaper clears it after timeout. + +## Channel semaphore lifecycle + +``` + startup make_call OK call_end_webhook + │ │ │ + outbound_number │ │ + M = maximum_channels │ │ + │ │ │ + ▼ ▼ ▼ + DEL bb:channel:{id} token consumed (stays LPUSH bb:channel:{id} + RPUSH token x M with in-flight call) token + │ + │ LLEN = M (idle state, all channels free) + │ + ▼ worker BLPOP ──► LLEN = M-1 ──► ... ──► LLEN = 0 (all busy) + │ + ▼ + next BLPOP blocks for timeout + worker reschedules via ZADD +``` + +Token identity does not matter — every token is interchangeable. What matters is count. + +**On outbound number config change** (`maximum_channels` edited): the reconciler will top up or drain tokens to match within 60s. For instant reflection, provide an admin endpoint that re-runs the init for that one number. + +## Query shapes (reference) + +### Ingest + +```python +# Inside push_lead_handler, after the existing INSERT: +await redis.zadd( + "bb:schedule:leads", + {lead_id: int(next_attempt_at.timestamp() * 1000)}, +) +``` + +### Promoter tick (Lua script for atomic multi-move) + +```lua +-- KEYS[1] = bb:schedule:leads +-- ARGV[1] = now_ms +-- ARGV[2] = batch_limit +-- ARGV[3..N] = shard-key-prefix (we append id modulo SHARD_COUNT in Python before EVAL) + +local ids = redis.call("ZRANGEBYSCORE", KEYS[1], 0, ARGV[1], "LIMIT", 0, tonumber(ARGV[2])) +if #ids == 0 then return 0 end +local moved = 0 +for i, id in ipairs(ids) do + if redis.call("ZREM", KEYS[1], id) == 1 then + -- shard resolution must be done by caller; pass shard-specific script per batch + moved = moved + 1 + end +end +return moved +``` + +*Implementation note:* the promoter will typically group ids by shard in Python, then issue one Lua per shard. Keeps the script simple and composable. + +### Worker pick + +```python +shard = f"bb:ready:leads:{my_shard}" +popped = await redis.blpop(shard, timeout=30) +if popped is None: + return # periodic wake for shutdown signal +_, lead_id = popped +await redis.rpush(f"bb:processing:leads:{worker_uuid}", lead_id) +``` + +### Reconciler (periodic `reconcile_backlog_to_zset`) + +```sql +SELECT id, EXTRACT(EPOCH FROM next_attempt_at) * 1000 AS score_ms +FROM lead_call_tracker +WHERE status = 'BACKLOG' + AND is_locked = FALSE + AND execution_mode IN ('TELEPHONY', 'TELEPHONY_TEST') + AND next_attempt_at <= NOW() + INTERVAL '10 minutes' +ORDER BY next_attempt_at ASC +LIMIT 1000; +``` + +Then, for each returned row, check `ZSCORE bb:schedule:leads id` — if missing, `ZADD`. The filter "next_attempt_at <= NOW() + 10 min" bounds the scan window; far-future leads are handled on a subsequent tick as their firing time approaches. + +**This is the only SQL scan in the hot-adjacent path**, and it runs once a minute on a bounded time window with an index on `(status, next_attempt_at)`. Compare to the current cron which runs the same scan every 30s with *no* bound beyond "past due", and you see why scaling improves by orders of magnitude. + +### Reaper (periodic `reap_stuck_processing_lists`) + +```python +# For each bb:processing:leads:* key: +for key in await redis.scan_iter(match="bb:processing:leads:*"): + worker_uuid = key.split(":")[-1] + if worker_last_seen(worker_uuid) > now - 5min: + continue # still alive + for lead_id in await redis.lrange(key, 0, -1): + lead = await get_lead_by_id(lead_id) + if lead.status == "PROCESSING": + # Call already in flight, just clean up tracking + await redis.lrem(key, 1, lead_id) + continue + if lead.status == "BACKLOG": + await redis.zadd("bb:schedule:leads", {lead_id: now_ms}) + await redis.lrem(key, 1, lead_id) +``` + +Worker liveness is tracked via `bb:worker:heartbeat:{worker_uuid}` with TTL 60s, refreshed every 10s. + +## Sizing assumptions + +For capacity planning. Order-of-magnitude, not precise. + +| Metric | Assumed | Redis impact | +|---|---|---| +| Peak ingest rate | 1000 leads/minute | 17 ZADDs/sec | +| Avg scheduling delay | 5 minutes | steady-state ZSET size ≈ 5000 members | +| Peak simultaneous dispatch | 500/sec (if channels allow) | 500 BLPOPs/sec, 500 LPUSHes/sec | +| Total channel tokens | 1000 (across all numbers) | 1000 LIST elements total | +| Promoter tick | 200ms, 500 lead batch cap | 5 EVAL/sec at peak | +| Reconciler | 60s, 1000-row scan | negligible | + +All of this fits comfortably on a single Redis node. Cluster is not required for scale; it would be a HA decision. diff --git a/docs/event-driven-dispatch/04-reliability.md b/docs/event-driven-dispatch/04-reliability.md new file mode 100644 index 000000000..176ba4293 --- /dev/null +++ b/docs/event-driven-dispatch/04-reliability.md @@ -0,0 +1,213 @@ +# 04 — Reliability and Failure Modes + +Distributed systems fail. This document is a per-failure-mode catalog of what goes wrong and how the design recovers. If a failure isn't listed here, either the design doesn't handle it or the document needs updating — both should be treated as a bug. + +## Delivery guarantees + +- **At-least-once dispatch.** A lead may be picked off the ready list more than once during failure scenarios (worker crash mid-pick). The worker's first action — `acquire_lock_on_lead_by_id` with `expected_status=BACKLOG` — is the idempotency guard. If the lead is already PROCESSING, the worker drops it on the floor. +- **Exactly-once call.** Only the worker that wins the DB row lock calls `provider.make_call`. This is enforced by Postgres, not by Redis. +- **At-most-once channel token consumption per call.** The token is held from `BLPOP` until either `make_call` fails (token returned immediately) or the call-end webhook fires (token returned then). If the webhook is lost, the token reconciler recovers it. + +## Failure catalog + +### Ingest path: pod dies between INSERT and ZADD + +**Symptom:** lead row exists in DB with status=BACKLOG, no ZSET entry, no dispatch. + +**Detection:** `reconcile_backlog_to_zset` running every 60s. + +**Recovery:** reconciler adds the missing ZSET entry within 60s. + +**User impact:** up to 60s additional dispatch latency for this one lead. Acceptable. + +**Why not an outbox/transaction:** cost of reconciler is negligible; cost of an outbox and relay process is real (operational burden, another failure mode). We pay the rare-crash-window latency to avoid the ongoing complexity. + +--- + +### Redis goes down (cold restart, OOM, crash) + +**Symptom:** promoter can't connect, workers block on `BLPOP` forever, ingest fails to `ZADD` (the INSERT still succeeds). + +**Detection:** Redis client errors. Metric: `redis_connection_errors_total`. + +**Recovery:** When Redis comes back up: + +1. Boot-time reconciler runs (triggered from app lifespan or scheduled immediately): + - `DEL bb:schedule:leads` (defensive — it's empty anyway) + - Scan all BACKLOG rows with `next_attempt_at <= NOW() + 1 day` and bulk `ZADD`. + - Re-initialize all channel semaphores. +2. Promoter resumes leader election and starts ticking. +3. Workers' `BLPOP` timers expire and they retry. + +**User impact:** dispatch pauses for the Redis outage duration. Ingest `POST /leads` still returns 201 because the DB write succeeded; the `ZADD` failure is logged and the reconciler will pick it up. No lead is lost. + +**Critical design point:** ingest must not fail the HTTP request if Redis is down. Log the `ZADD` failure and return 201. The DB write is authoritative. + +--- + +### Redis data loss (e.g. no AOF, restarted without dump) + +**Symptom:** same as above, plus all ZSET entries and channel tokens gone. + +**Recovery:** identical to "Redis goes down" — the boot reconciler rebuilds from Postgres. Channel semaphores re-init from `outbound_number.maximum_channels`. + +**Why this is safe:** Redis holds no authoritative state. Every key is derivable from the DB. + +--- + +### Promoter leader dies + +**Symptom:** no leads move from `schedule:leads` to `ready:leads:*`. + +**Detection:** lag metric on `schedule:leads` — members with score < now that haven't been consumed. + +**Recovery:** leader lock TTL is 5s. Within 5s of leader death, another pod's candidate loop acquires `bb:promoter:leader` and takes over. + +**User impact:** up to 5s of dispatch pause. Negligible. + +**Guard against split-brain:** the lock is renewed every 2s with a CAS (`SET ... XX GET` then compare value to self). If renewal fails, the process stops promoting immediately. A brief overlap during hand-off is harmless because `ZREM` is the authoritative pick — two promoters racing just means the slower `ZREM` returns 0 and the lead is only moved once. + +--- + +### Worker crashes between BLPOP and finish + +Three sub-cases depending on how far the worker got. + +**Sub-case A: crashed between `BLPOP ready` and `RPUSH processing`** + +- Lead is lost from Redis. DB row is still BACKLOG. +- Recovered by `reconcile_backlog_to_zset` within 60s. +- Window is tiny (a couple of microseconds between two Redis calls). Rare. + +**Sub-case B: crashed after `RPUSH processing`, before DB row lock** + +- Lead is in `processing:leads:{worker_uuid}` list. DB row is BACKLOG, not locked. +- Recovered by `reap_stuck_processing_lists`: after 5 min of worker heartbeat silence, reaper re-ZADDs to schedule and `LREM`s from processing list. + +**Sub-case C: crashed after `make_call` succeeded, before `LREM` from processing list** + +- DB row is PROCESSING with `call_id` set. +- Lead is still in `processing:leads:{worker_uuid}`. +- Reaper checks DB status, sees PROCESSING, just cleans up the processing list entry. Does NOT re-dispatch. No duplicate call. + +**Sub-case D: crashed after `make_call` succeeded, before channel token is released** + +- This is a call-end webhook concern, not a worker crash concern. The token is held by the "in-flight call", which is now orphaned. +- Recovered by `reconcile_channel_tokens`: reconciler computes expected token count from DB in-flight calls; if Redis is short, top up. + +--- + +### Call-end webhook never arrives + +**Symptom:** channel token never returned. Over time, `LLEN bb:channel:{id}` drifts below expected. + +**Detection:** `reconcile_channel_tokens` running every 60s compares DB in-flight count to Redis token count. + +**Recovery:** top up missing tokens. Also update `lead_call_tracker` status to FINISHED with `outcome=WEBHOOK_LOST` if the call has been "PROCESSING" for longer than a sane upper bound (e.g. 30 min). + +--- + +### Duplicate webhook (provider retries) + +**Symptom:** `LPUSH channel:{id} token` called twice for the same call end. `LLEN` exceeds M. + +**Impact:** None immediately — extra tokens just allow more concurrent calls on that number. + +**Detection:** `reconcile_channel_tokens` sees `LLEN > M - in_flight` and `LTRIM`s the excess. + +**Recovery:** self-correcting within 60s. + +--- + +### Stuck row lock (`is_locked=TRUE` for long time) + +**Symptom:** lead stays in BACKLOG with `is_locked=TRUE` indefinitely. + +**Cause:** worker crashed between acquiring the row lock and either (a) finishing the call, or (b) releasing the lock on error. + +**Detection:** query for `is_locked=TRUE AND locked_at < now - 10 min`. + +**Recovery:** `clean_stale_bb_locks` every 5 min unlocks them. + +--- + +### DB goes down + +**Symptom:** `POST /leads` returns 500; workers fail on the initial SELECT after `BLPOP`. + +**Worker behavior:** on DB error, release the channel token (if acquired), return the lead to the ZSET with a short backoff (5-10s), and continue the loop. + +**Ingest behavior:** returns 500 to the client. Client retries as usual. + +**Why this is safe:** no partial state is persisted to Redis that would later cause a phantom call. + +--- + +### One reseller overloads a shard + +**Symptom:** `ready:leads:{shard}` grows large; workers on that shard can't keep up; other shards are idle. + +**Detection:** per-shard `LLEN` metric. + +**Mitigations in order:** + +1. Pause the offending reseller via `bb:reseller:paused:{id}`. Workers drop their leads back to the ZSET with a delay. +2. Rebalance — change shard count or per-shard worker count (requires restart; accept as a known limitation). +3. Longer-term: add per-reseller concurrency cap as a second BLPOP semaphore (`bb:reseller:cap:{id}`). + +--- + +### Provider is flaky (make_call returns errors) + +**Symptom:** repeated failures on calls to a specific provider. + +**Recovery:** existing retry logic moves unchanged. Worker catches the exception, releases the channel token, re-ZADDs with exponential backoff. `attempt_count >= max_retry` transitions to FINISHED with outcome FAILED. + +**Provider fallback (today's logic at `calls.py` — picking Exotel when Twilio fails):** preserved, happens inside the worker's `make_call` path, not at the dispatch layer. + +--- + +### Time desync between pods + +**Symptom:** some pods see `now()` ahead of others; promoter on a slow-clock pod might `ZRANGEBYSCORE` a range that missed leads. + +**Impact:** marginal scheduling skew bounded by clock drift. With NTP-synced hosts this is < 100ms. + +**Mitigation:** none required. The leader is one pod at a time, so there's no inter-pod race on the ZSET range. If the leader's clock is off, it's off — the next leader (different pod) corrects it. + +--- + +### Retry storm after an outage recovery + +**Symptom:** reconciler re-ZADDs thousands of leads with score = now, workers try to dispatch all at once. + +**Mitigation:** reconciler adds small random jitter to each `ZADD` score (`now + rand(0, 2000ms)`). 10k leads spread over 2s, not 1ms. Combined with the channel semaphore, dispatch naturally paces to capacity. + +--- + +### Poison leads (always fail) + +**Symptom:** a lead hits max retries and keeps consuming channel slots. + +**Detection:** existing `attempt_count >= max_retry` check transitions to FINISHED with outcome FAILED. Same as today. + +**Optional:** a dead-letter ZSET `bb:dlq:leads` for manual inspection. Not part of the critical path; can be added later. + +## Idempotency summary + +The entire system assumes **at-least-once delivery** at the Redis layer and **exactly-once effect** at the DB layer. The four enforcement points: + +1. **Promoter:** `ZREM` returns 0 if another promoter got there first → lead is not moved. +2. **Worker pick:** status check after `BLPOP` — if not BACKLOG, drop. +3. **Worker row lock:** `acquire_lock_on_lead_by_id` with `expected_status=BACKLOG` — atomic UPDATE, fails if already locked. +4. **Provider call:** `UPDATE ... WHERE id=? AND status=BACKLOG` after `make_call` — if two workers somehow got past (3), only one UPDATE succeeds. + +Four independent guards. Each one alone would prevent duplicate dispatch; having all four makes it impossible under any failure scenario that still obeys the laws of Postgres. + +## What could still go wrong + +Honest list of known-unknowns: + +- **Redlock-based leader election is not bulletproof under adversarial conditions** (GC pauses, network partitions with clock skew). For our use case it's sufficient because a brief double-promote is absorbed by the `ZREM`-returns-0 guard. If we later find real double-promote in production logs, we switch to a fencing-token approach. +- **Channel semaphore isn't fault-tolerant to Redis partition in multi-DC setup.** We don't run multi-DC; single-DC single-Redis assumption is fine. +- **Very long scheduling horizons** (e.g. schedule a lead 30 days out). The ZSET grows. Today's `initial_offset` is typically minutes; if someone schedules for days, it still works, the ZSET just holds it. diff --git a/docs/event-driven-dispatch/05-migration.md b/docs/event-driven-dispatch/05-migration.md new file mode 100644 index 000000000..b7f829890 --- /dev/null +++ b/docs/event-driven-dispatch/05-migration.md @@ -0,0 +1,157 @@ +# 05 — Migration Plan + +No flag day. Per-reseller rollout with rollback at every phase. The cron continues running as the canonical path until we've proven the event path in production. + +## Guiding principles + +1. **DB is source of truth throughout.** Both paths read and write the same `lead_call_tracker` rows. They cannot diverge. +2. **Dual-write is cheap.** `ZADD` on ingest is a single Redis op; adding it doesn't break anything. +3. **Feature flag per reseller.** Dispatch path selection is a per-reseller flag. Rollback == flip one flag. +4. **Cron is the safety net.** In early phases, if the event path misses a lead, cron picks it up 30s later. Workers in the event path must not cause double-dispatch (they won't — see [04-reliability.md](04-reliability.md)). + +## Phase 0 — Preparation (no behavior change) + +**Duration:** ~1 sprint. + +**Actions:** + +- Add Redis client wiring for the new keys (already have `get_redis_service`). +- Implement the dispatch module (`app/ai/voice/agents/breeze_buddy/dispatch/`). +- Add the optional `lead_call_tracker.dispatched_at` and `shard` columns as migration `026_event_dispatch_columns.sql`. +- Register control-plane reconcilers on `BackgroundTaskScheduler` but have them run in **read-only mode** (compute what they'd do, log it, don't act). This validates the queries on real data. +- Add metrics: ZSET size, ready-list size per shard, processing-list size per worker, channel-token `LLEN` per number, promoter tick duration, leader identity. +- **Do not** start the promoter. **Do not** start workers. Event path is dark. + +**Exit criteria:** all code merged, metrics visible in the dashboard, reconcilers running in read-only mode with zero errors. + +## Phase 1 — Dual-write (ingest only) + +**Duration:** 2-3 days in staging, 1 week in prod. + +**Actions:** + +- `POST /leads` does `INSERT + ZADD` (new). Cron still does the dispatch. +- Start the promoter and workers, but behind a global kill-switch flag `bb:dispatch:enabled = false`. If set, workers `BLPOP` but immediately re-ZADD the lead (no-op consumption). This lets us validate promoter latency, worker pickup time, shard distribution without actually dispatching. +- Reconcilers move from read-only to active. + +**Observability focus:** + +- `ZADD` error rate on ingest (should be < 0.01%). +- Promoter tick duration and lag (leads with score < now in ZSET). +- Reconciler `reconcile_backlog_to_zset` re-enqueue count — should drop to near-zero after Phase 0's drift is cleared. + +**Exit criteria:** promoter lag < 500ms at p99 under production traffic. No reconciler-driven re-enqueues outside known crash windows. + +## Phase 2 — Shadow dispatch (one reseller, low volume) + +**Duration:** 1 week. + +**Actions:** + +- Pick one low-risk reseller (internal test account or the smallest production tenant). +- Set their feature flag `bb:dispatch:use_event_path:{reseller_id} = true`. +- Cron's `process_backlog_leads` now **skips leads from resellers with this flag set**. The event path owns them. +- Workers dispatch normally for this reseller. + +**What we're validating:** + +- End-to-end dispatch via the event path matches cron's behavior. +- Call outcomes, recording, webhooks, retries all work. +- No drift between DB state and Redis state. + +**Rollback:** flip the flag to false. Cron resumes picking up those leads on its next tick. Any lead already mid-dispatch on the event path completes normally (it's already PROCESSING). + +**Exit criteria:** one week of zero event-path-specific incidents for this reseller. Dispatch latency p50 < 500ms. Call outcome rates within 2% of the cron baseline. + +## Phase 3 — Graduated rollout (1% -> 10% -> 50%) + +**Duration:** 2-3 weeks. + +**Actions:** + +- Enable the flag for progressively larger slices of resellers, sorted by volume so low-volume goes first. +- After each bump, watch for a week. Key metrics: + - Dispatch latency percentiles. + - Reconciler-driven re-enqueues (should stay near zero). + - Channel-token `LLEN` vs expected (divergence indicates webhook issues). + - Per-shard ready-list size (hot-shard detection). + +**Rollback:** per-reseller flag flip. Cron takes over. + +**Exit criteria:** 50% of resellers on event path for at least 1 week with no path-specific regressions. + +## Phase 4 — Global flip + +**Duration:** 1 day + 1 week soak. + +**Actions:** + +- Enable the event path for the remaining 50%. +- Cron's `process_backlog_leads` now always returns an empty result set (all resellers flagged on). +- Leave cron running as a no-op for one week so rollback is trivial. + +**Rollback:** global flag flip returns all resellers to cron. Cron resumes. + +**Exit criteria:** one week of 100% event-path dispatch with no regression. + +## Phase 5 — Remove cron + +**Duration:** 1 PR. + +**Actions:** + +- Delete `process_backlog_leads` in `app/ai/voice/agents/breeze_buddy/managers/calls.py`. +- Delete the `/cron/initiate` route and the `cron_router` include. +- Delete the per-reseller flag (no longer branching). +- Update CLAUDE.md, remove references to the cron. + +**Retain:** + +- The `BackgroundTaskScheduler` — now hosting only control-plane reconcilers. +- The `lead_call_tracker.is_locked` column — used by workers. +- The `increment_outbound_number_channels` DB function — used by the channel reconciler to top up tokens. + +**Exit criteria:** the word "cron" appears in zero places outside docs that describe historical behavior. + +## Rollback decision tree + +At any phase, if something looks wrong: + +``` + metric anomaly detected + │ + ▼ + Is it event-path-specific? ── no ──► investigate unrelated cause + │ + yes + ▼ + Is the reseller flag scoped? ── yes ──► flip one reseller's flag + │ + no + ▼ + Flip global dispatch flag (Phase 1-3: falls back to cron + Phase 4+: requires cron re-enable) +``` + +## Schema migration caveats + +- `026_event_dispatch_columns.sql` adds `dispatched_at` and `shard` columns. Both nullable to avoid backfill. New rows get them populated; old rows leave them NULL. +- Do not drop `is_locked`. Workers still use it. +- Do not drop `attempt_count`. Retries still increment it. + +## Feature flag plumbing + +Use the existing DevCycle dynamic config (`app/core/config/dynamic.py`). Flag names: + +- `bb_dispatch_enabled` (global, default false) +- `bb_dispatch_use_event_path_for_reseller_{reseller_id}` (per-reseller, default false) + +Check the per-reseller flag in ingest (to decide whether to ZADD) and in cron (to decide whether to skip). Check the global flag in the worker to no-op if dispatching is paused. + +## What can go wrong during migration + +- **Dual-picking during flag transition.** When a reseller's flag flips from false to true, a lead ingested just before the flip has a DB row but no ZSET entry. Cron, seeing the flag now true, skips it. Lead is stuck until the reconciler picks it up. + - **Mitigation:** the `reconcile_backlog_to_zset` reconciler runs every 60s. Stuck window is bounded. + - **Better mitigation:** on flag flip, trigger an immediate one-shot reconciler run for that reseller. +- **Orphaned cron scans.** During Phase 1-3, cron still runs the scan but filters out flagged resellers. The scan cost doesn't disappear; it just returns fewer rows. This is fine, it's temporary. +- **Channel token double-init.** If the channel init code runs twice (e.g. restart during init), we'd have 2M tokens instead of M. The reconciler detects and trims within 60s. For belt-and-suspenders, init uses `DEL` before `RPUSH`. diff --git a/docs/event-driven-dispatch/06-operations.md b/docs/event-driven-dispatch/06-operations.md new file mode 100644 index 000000000..da32deca2 --- /dev/null +++ b/docs/event-driven-dispatch/06-operations.md @@ -0,0 +1,235 @@ +# 06 — Operations + +Tuning knobs, metrics, alerts, and runbooks for the event-driven dispatch system. + +## Tuning knobs + +All live in `app/core/config/static.py` (env-var driven) unless marked otherwise. + +### Promoter + +| Knob | Default | Meaning | When to tune | +|---|---|---|---| +| `BB_PROMOTER_TICK_MS` | 200 | How often the leader runs `ZRANGEBYSCORE`. | Lower to reduce dispatch latency (at cost of more Redis ops). Raise if Redis CPU high. | +| `BB_PROMOTER_BATCH` | 500 | Max leads moved per tick. | Raise if a single tick can't keep up with due leads. Watch for Redis command latency. | +| `BB_PROMOTER_LEADER_TTL_S` | 5 | Redlock TTL. | Rarely tuned. | +| `BB_PROMOTER_LEADER_RENEW_S` | 2 | Heartbeat interval (must be < TTL). | Rarely tuned. | + +### Workers + +| Knob | Default | Meaning | When to tune | +|---|---|---|---| +| `BB_WORKERS_PER_SHARD` | 4 | Async tasks per pod per shard. | Raise for hotter shards. | +| `BB_WORKER_BLPOP_TIMEOUT_S` | 30 | Worker `BLPOP` timeout (for shutdown responsiveness). | Rarely tuned. | +| `BB_SHARD_COUNT` | 16 | Total shards. **Deploy-time constant.** | Set once; changing requires careful migration. | +| `BB_WORKER_HEARTBEAT_TTL_S` | 60 | Worker liveness key TTL. | Rarely tuned. | +| `BB_WORKER_HEARTBEAT_REFRESH_S` | 10 | Worker heartbeat refresh interval. | Must be < TTL. | + +### Channel semaphore + +| Knob | Default | Meaning | When to tune | +|---|---|---|---| +| `BB_CHANNEL_BLPOP_TIMEOUT_S` | 10 | How long worker waits for a free channel. | Lower -> more re-scheduling, less provider timeout risk. Raise -> smoother dispatch at peak. | +| `BB_CHANNEL_WAIT_BACKOFF_MAX_S` | 3 | Jitter range added to ZADD when channels were unavailable. | Rarely tuned. | + +### Reconcilers + +| Knob | Default | Meaning | +|---|---|---| +| `BB_RECONCILE_BACKLOG_INTERVAL_S` | 60 | How often to heal missing ZSET entries. | +| `BB_RECONCILE_BACKLOG_LIMIT` | 1000 | Max rows scanned per reconciler tick. | +| `BB_REAP_PROCESSING_INTERVAL_S` | 30 | Stuck-worker reaper interval. | +| `BB_REAP_PROCESSING_STUCK_AFTER_S` | 300 | Worker considered stuck after this much heartbeat silence. | +| `BB_RECONCILE_CHANNELS_INTERVAL_S` | 60 | Channel-token reconciler. | + +### Operational flags (Redis-backed, runtime-flippable) + +| Key | Default | Effect | +|---|---|---| +| `bb:dispatch:enabled` | `true` | Kill-switch. If `false`, workers `BLPOP` but no-op. | +| `bb:promoter:paused` | absent | If exists, promoter skips its tick. | +| `bb:reseller:paused:{id}` | absent | If exists, workers re-ZADD leads for this reseller without dispatching. | + +## Metrics + +Emit via existing OTEL setup (`app/ai/voice/agents/breeze_buddy/observability/`). + +### Gauges + +| Metric | Labels | Purpose | +|---|---|---| +| `bb_schedule_size` | - | `ZCARD bb:schedule:leads` | +| `bb_schedule_overdue_count` | - | Leads in ZSET with score < now. Should be small. | +| `bb_ready_list_size` | shard | `LLEN bb:ready:leads:{shard}` | +| `bb_processing_list_size` | worker_uuid | Per-worker in-flight count | +| `bb_channel_tokens_available` | number_id | `LLEN bb:channel:{id}` | +| `bb_channel_tokens_expected` | number_id | DB-derived expected count | +| `bb_promoter_leader_pod` | pod_id | 1 on the leader, 0 elsewhere | +| `bb_workers_alive` | shard | Count of workers heartbeating | + +### Counters + +| Metric | Labels | Purpose | +|---|---|---| +| `bb_ingest_zadd_total` | result | Increment on every ingest `ZADD`. result=ok/fail. | +| `bb_promoter_ticks_total` | result | Promoter tick count. | +| `bb_promoter_moved_total` | - | Total leads moved from ZSET to ready. | +| `bb_worker_picked_total` | shard, outcome | Worker picked a lead. outcome=dispatched/skipped/rescheduled/errored. | +| `bb_channel_wait_timeout_total` | number_id | Worker gave up waiting for a channel. | +| `bb_reconciler_fix_total` | reconciler | Number of inconsistencies fixed. | +| `bb_leader_elections_total` | - | Every time a pod becomes leader. | + +### Histograms + +| Metric | Labels | Purpose | +|---|---|---| +| `bb_promoter_tick_duration_ms` | - | ZRANGEBYSCORE + moves. | +| `bb_dispatch_latency_ms` | - | `next_attempt_at` to `make_call` attempt. | +| `bb_channel_wait_duration_ms` | number_id | BLPOP wait time on channel semaphore. | +| `bb_worker_process_duration_ms` | outcome | Full worker loop iteration. | + +## Alerts + +Recommended, in priority order: + +| Alert | Condition | Severity | +|---|---|---| +| Dispatch halted | `bb_promoter_moved_total` rate = 0 for > 2 min AND `bb_schedule_overdue_count` > 10 | P0 | +| No leader | `sum(bb_promoter_leader_pod) == 0` for > 30s | P0 | +| Dispatch latency high | `bb_dispatch_latency_ms` p99 > 5s for 5 min | P1 | +| Ingest ZADD failing | `bb_ingest_zadd_total{result="fail"}` rate > 1% | P1 | +| Channel token drift | `abs(bb_channel_tokens_available - bb_channel_tokens_expected) > 5` sustained 2 min | P2 | +| Reconciler working hard | `bb_reconciler_fix_total{reconciler="backlog_to_zset"}` > 10/min | P2 | +| Hot shard | `bb_ready_list_size` > 100 sustained for one shard | P3 | +| Workers dying | `bb_workers_alive` drops > 50% | P2 | + +## Runbooks + +### Dispatch halted + +**Symptoms:** lead dispatch stops; `bb_schedule_overdue_count` climbs. + +**Checklist:** + +1. Check `bb_promoter_leader_pod` — is there a leader? +2. If no leader: check Redis connectivity from all pods. Check for `bb:promoter:paused` key existence (someone paused it). +3. If leader exists but not moving leads: check `bb_promoter_tick_duration_ms`. Hung Redis? +4. Check worker health: `bb_workers_alive`. Are workers `BLPOP`ing? +5. Check `bb:dispatch:enabled` flag. Is it accidentally false? + +**Emergency recovery:** + +- If promoter is stuck, kill the leader pod. New leader elects within 5s. +- If Redis is unreachable, dispatch resumes when Redis is back; boot reconciler restores state. +- If workers are all dead (process pool issue), rolling restart pods. + +### Channel token drift + +**Symptoms:** `bb_channel_tokens_available` differs from expected by > 5. + +**Likely causes:** + +1. Call-end webhook losses (token not returned). +2. Duplicate webhooks (token returned twice). +3. Worker crashed after `make_call` but before releasing on error. + +**Checklist:** + +1. Check `reconcile_channel_tokens` log — is it detecting and fixing? If yes, watch for 2 min; should converge. +2. If drift is growing despite reconciler, check for provider webhook failures in the telephony callback logs. +3. Manual fix (last resort): `DEL bb:channel:{id}` then restart the pod that owns channel init. Reconciler will rebuild from DB. + +### Hot shard + +**Symptoms:** one shard's ready list grows continuously. + +**Likely cause:** one reseller dominates that shard. + +**Immediate mitigation:** + +- Pause the offending reseller: `SET bb:reseller:paused:{id} 1`. Workers will re-ZADD their leads; peak drains. +- Un-pause when the spike passes. + +**Longer-term:** + +- Increase `BB_WORKERS_PER_SHARD`. +- Consider sharding by `(reseller_id, template)` pair if one template dominates. + +### Lead stuck in BACKLOG forever + +**Symptoms:** a specific lead stays BACKLOG well past its `next_attempt_at`. + +**Checklist:** + +1. Is the reseller paused? Check `bb:reseller:paused:{id}`. +2. Is the row `is_locked=TRUE`? Check `locked_at` — if old, `clean_stale_bb_locks` should handle it within 5 min. +3. Is it in `bb:schedule:leads`? Run `ZSCORE bb:schedule:leads `. + - If missing, `reconcile_backlog_to_zset` will add it within 60s. + - If present with score > now, it's correctly waiting. +4. Check if the lead's number has available channels — worker might be stuck waiting. + +**Manual fix:** + +``` +ZADD bb:schedule:leads +``` + +### Startup failed, pod stuck + +**Symptoms:** pod starts but dispatch doesn't begin. + +**Checklist:** + +1. Check startup logs for "channel semaphore init failed" or "promoter start failed". +2. If Redis was unreachable at startup, the semaphore init would have logged and retried. Confirm Redis is now healthy and restart the pod. +3. Confirm migrations have run (`026_event_dispatch_columns.sql`). + +### Rolling deploy behavior + +- A pod going down mid-dispatch leaves entries in `bb:processing:leads:{worker_uuid}`. The reaper recovers them within 5 min. +- A pod coming up does NOT need to do a full reconcile — the periodic reconciler already runs on every pod. +- The leader might hop between pods during deploy. Leader-election handles this naturally. +- **Do not run `FLUSHDB` on the Redis instance.** If you must, the boot reconciler will rebuild, but any in-flight call's channel token is gone (DB still correct, just tokens wrong). Recovery within 60s. + +## Dashboards + +Minimal dashboard suggestion (Grafana or equivalent): + +**Row 1 - Health** + +- `bb_promoter_leader_pod` (stat — current leader) +- `bb_schedule_overdue_count` (graph) +- `bb_dispatch_latency_ms` p50/p95/p99 (graph) + +**Row 2 - Throughput** + +- `bb_promoter_moved_total` rate (graph) +- `bb_worker_picked_total` by outcome (stacked graph) +- `bb_ingest_zadd_total` by result (graph) + +**Row 3 - Capacity** + +- `bb_channel_tokens_available` vs `bb_channel_tokens_expected` per number (graph) +- `bb_channel_wait_duration_ms` p99 (graph) +- `bb_channel_wait_timeout_total` rate (graph) + +**Row 4 - Sharding** + +- `bb_ready_list_size` per shard (graph, side-by-side) +- `bb_workers_alive` per shard (graph) +- `bb_processing_list_size` sum (graph — rough in-flight count) + +**Row 5 - Reconcilers** + +- `bb_reconciler_fix_total` per reconciler (graph) +- Reconciler duration histogram (graph) + +## Capacity planning checklist + +Before a known high-volume campaign: + +1. Sum up `maximum_channels` across active numbers. This is your calls/sec ceiling (roughly; depends on call duration). +2. Check current `bb_schedule_size` baseline. +3. Estimate expected peak ingest rate. ZSET should handle orders of magnitude more. +4. Verify worker count per shard can saturate channels. Rough formula: `workers_per_shard >= channels_per_shard`. If you have 100 channels and 16 shards, ~6-8 workers/shard is a starting point. +5. Confirm Redis instance headroom (CPU, memory). At the scale described in [03-data-model.md](03-data-model.md) this is negligible, but worth checking. diff --git a/docs/event-driven-dispatch/README.md b/docs/event-driven-dispatch/README.md new file mode 100644 index 000000000..8e969575a --- /dev/null +++ b/docs/event-driven-dispatch/README.md @@ -0,0 +1,43 @@ +# Event-Driven Lead Dispatch + +> **Status:** Design document. Not yet implemented. Replaces the current cron-based backlog picker (`process_backlog_leads` in `app/ai/voice/agents/breeze_buddy/managers/calls.py`) and the `/cron/initiate` trigger. + +## What this is + +A Redis-backed, event-driven replacement for the current polling-based lead dispatch in Breeze Buddy. Leads become events on a delayed queue; a small pool of promoter + worker processes consume them at the moment they are due, throttled only by telephony channel capacity. + +## What it replaces + +| Today | New | +|---|---| +| Cron pings `/cron/initiate` every 30s | No cron on the hot path | +| `process_backlog_leads` does `SELECT ... WHERE status=BACKLOG AND next_attempt_at <= NOW()` on every tick | No periodic SELECT on the hot path | +| Iterates leads serially; a slow batch overlaps with the next tick | Work is parallel, bounded by channel capacity | +| Retries = new BACKLOG rows picked up by the next scan | Retries = `ZADD` back to the delayed queue | +| Earliest dispatch latency: 0 - 30s | Earliest dispatch latency: ~200ms - 500ms | + +The existing `BackgroundTaskScheduler` (`app/core/background_tasks/`) is **kept** and repurposed for slow control-plane chores (reconcilers, janitors). It is not used for dispatch. + +## Key properties + +- **Throughput ceiling** = sum of `maximum_channels` across active outbound numbers. Not cron frequency, not worker count. +- **Dispatch latency** ≈ promoter tick (configurable, default 200ms) + worker pickup + provider round-trip. +- **No hot-path DB polling.** Ingest writes an event; promoter reads from Redis; workers read from Redis. DB is touched per-lead, not per-tick. +- **Source of truth stays in Postgres.** Redis is a dispatch fabric. Every Redis structure has a DB-driven reconciler behind it. +- **Horizontally scalable.** Add pods → more workers → more throughput, up to the telephony ceiling. No single lock serializes the work. +- **Safe to lose Redis.** A reconciler on pod boot rebuilds the ZSET from the `lead_call_tracker` table. No data loss, just a brief scheduling gap. + +## Reading order + +| Doc | What you'll find | +|---|---| +| [01-motivation.md](01-motivation.md) | Exactly why the current cron breaks at scale, with numbers. Read this first if you want the "why". | +| [02-architecture.md](02-architecture.md) | The five planes (Ingest, Schedule, Promote, Dispatch, Control), ASCII diagrams, component responsibilities. | +| [03-data-model.md](03-data-model.md) | Every Redis key, its type, TTL, producer, consumer. DB column additions if any. Lead state machine. | +| [04-reliability.md](04-reliability.md) | Failure modes and recovery. What happens when a worker dies, when Redis restarts, when a webhook is lost. | +| [05-migration.md](05-migration.md) | Phased dual-write rollout. No flag day. Per-reseller flag. Rollback path. | +| [06-operations.md](06-operations.md) | Tuning knobs, metrics, alerts, runbooks. | + +## One-paragraph summary + +`POST /leads` writes to Postgres and `ZADD`s the lead id into `schedule:leads` with score = `next_attempt_at`. A leader-elected promoter every 200ms runs `ZRANGEBYSCORE 0 now`, `ZREM`s due leads, and `LPUSH`es them onto sharded `ready:leads:{shard}` lists. Worker tasks in every pod `BLPOP` from their shard's ready list, acquire a DB row lock, pass pre-checks, and `BLPOP` a token from `channel:{outbound_number_id}` before dialing. On call-end webhook, the token is `LPUSH`ed back. Retries re-enter the ZSET. The existing `BackgroundTaskScheduler` runs the reconcilers that heal Redis-vs-DB drift.