docs: event-driven lead dispatch design + endpoint reference fixes#722
docs: event-driven lead dispatch design + endpoint reference fixes#722swaroopvarma1 wants to merge 1 commit into
Conversation
Adds a design document set under docs/event-driven-dispatch/ describing the proposed Redis-backed event-driven replacement for the cron-based backlog picker. Covers motivation, architecture (ingest, delayed ZSET, leader-elected promoter, sharded workers, channel semaphore, control plane reconcilers), data model, failure modes and recovery, phased dual-write migration, and operations runbooks. Also corrects stale /push/lead/v2 references in CLAUDE.md and BREEZE_BUDDY_ARCHITECTURE.md — the current endpoint is POST /leads, handled in app/api/routers/breeze_buddy/leads/. No code changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughUpdated documentation for Breeze Buddy lead intake endpoint from legacy Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~15 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds a new design-doc set proposing an event-driven, Redis-backed replacement for the current cron-based Breeze Buddy lead dispatch path, and updates existing documentation to reference the correct lead ingestion endpoint (POST /leads).
Changes:
- Introduces
docs/event-driven-dispatch/(motivation → architecture → data model → reliability → migration → operations) as a design proposal for replacing/cron/initiate+process_backlog_leads. - Updates stale
/push/lead/v2references toPOST /leadsin existing docs (CLAUDE.md,docs/BREEZE_BUDDY_ARCHITECTURE.md). - Expands the Breeze Buddy architecture doc’s leads-router endpoint listing and links.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/event-driven-dispatch/README.md | Landing page + summary of the proposed event-driven dispatch approach |
| docs/event-driven-dispatch/01-motivation.md | Documents current cron/polling scaling pain points and requirements |
| docs/event-driven-dispatch/02-architecture.md | Describes the proposed five-plane architecture and component placement |
| docs/event-driven-dispatch/03-data-model.md | Defines Redis keyspace + optional DB columns + state machine |
| docs/event-driven-dispatch/04-reliability.md | Catalogs failure modes and recovery strategies |
| docs/event-driven-dispatch/05-migration.md | Phased dual-write rollout plan + flags/rollback approach |
| docs/event-driven-dispatch/06-operations.md | Proposed knobs/metrics/alerts/runbooks for operating the system |
| docs/BREEZE_BUDDY_ARCHITECTURE.md | Fixes endpoint references and updates leads-router location/endpoints |
| CLAUDE.md | Fixes lead ingestion endpoint reference |
|
|
||
| ## One-paragraph summary | ||
|
|
||
| `POST /leads` writes to Postgres and `ZADD`s the lead id into `schedule:leads` with score = `next_attempt_at`. A leader-elected promoter every 200ms runs `ZRANGEBYSCORE 0 now`, `ZREM`s due leads, and `LPUSH`es them onto sharded `ready:leads:{shard}` lists. Worker tasks in every pod `BLPOP` from their shard's ready list, acquire a DB row lock, pass pre-checks, and `BLPOP` a token from `channel:{outbound_number_id}` before dialing. On call-end webhook, the token is `LPUSH`ed back. Retries re-enter the ZSET. The existing `BackgroundTaskScheduler` runs the reconcilers that heal Redis-vs-DB drift. |
| All live in `app/core/config/static.py` (env-var driven) unless marked otherwise. | ||
|
|
| ## Feature flag plumbing | ||
|
|
||
| Use the existing DevCycle dynamic config (`app/core/config/dynamic.py`). Flag names: | ||
|
|
||
| - `bb_dispatch_enabled` (global, default false) | ||
| - `bb_dispatch_use_event_path_for_reseller_{reseller_id}` (per-reseller, default false) | ||
|
|
||
| Check the per-reseller flag in ingest (to decide whether to ZADD) and in cron (to decide whether to skip). Check the global flag in the worker to no-op if dispatching is paused. | ||
|
|
| | Key | Type | Purpose | Produced by | Consumed by | Eviction | | ||
| |---|---|---|---|---|---| | ||
| | `bb:schedule:leads` | ZSET | Delayed queue of leads awaiting dispatch. Score = `next_attempt_at` in Unix ms. Member = lead id. | Ingest (`POST /leads`), Worker retry, Reconciler | Promoter | Never (members removed by `ZREM` on promotion or `ZREM` on cancel) | | ||
| | `bb:ready:leads:{shard}` | LIST | Ready-to-dispatch leads for a shard. FIFO. | Promoter | Worker `BLPOP` | Never (drained by workers) | |
| client ─────► POST /leads ──────► insert into lead_call_tracker │ | ||
| │ ZADD schedule:leads <ts> <id> │ | ||
| │ return 201 │ | ||
| └────────────────┬─────────────────────┘ | ||
| │ | ||
| ▼ | ||
| ┌──────────────────────────────────────┐ | ||
| │ PLANE 2 - TIME SCHEDULING │ | ||
| │ Redis ZSET: schedule:leads │ | ||
| │ score = next_attempt_at_unix_ms │ | ||
| │ member = lead_id │ | ||
| └────────────────┬─────────────────────┘ |
| | `reconcile_backlog_to_zset` | 60s | Scan BACKLOG rows not present in `schedule:leads`; re-enqueue them. Heals lost ingest events and worker crashes before `ZADD`-on-retry. | | ||
| | `reap_stuck_processing_lists` | 30s | Scan `processing:leads:*` for entries held > 5 min. Re-ZADD to scheduler with `now`. Checks DB state first to avoid re-dispatching a lead that's already PROCESSING. | | ||
| | `reconcile_channel_tokens` | 60s | For each active outbound number, compute `M - in_flight_calls_from_db` vs `LLEN channel:{id}`. Top up leaked tokens. | | ||
| | `clean_stale_bb_locks` | 300s | Free rows with `is_locked=TRUE` and `locked_at > 10 min`. Last-line defense against stuck locks. | |
|
|
||
| **User impact:** up to 5s of dispatch pause. Negligible. | ||
|
|
||
| **Guard against split-brain:** the lock is renewed every 2s with a CAS (`SET ... XX GET` then compare value to self). If renewal fails, the process stops promoting immediately. A brief overlap during hand-off is harmless because `ZREM` is the authoritative pick — two promoters racing just means the slower `ZREM` returns 0 and the lead is only moved once. |
| | `bb:dispatch:enabled` | `true` | Kill-switch. If `false`, workers `BLPOP` but no-op. | | ||
| | `bb:promoter:paused` | absent | If exists, promoter skips its tick. | | ||
| | `bb:reseller:paused:{id}` | absent | If exists, workers re-ZADD leads for this reseller without dispatching. | | ||
|
|
|
|
||
| 1. Check startup logs for "channel semaphore init failed" or "promoter start failed". | ||
| 2. If Redis was unreachable at startup, the semaphore init would have logged and retried. Confirm Redis is now healthy and restart the pod. | ||
| 3. Confirm migrations have run (`026_event_dispatch_columns.sql`). |
| ##### `DELETE /leads/{lead_id}` | ||
| Aborts a lead by ID ([__init__.py](app/api/routers/breeze_buddy/leads/__init__.py)) |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
docs/event-driven-dispatch/06-operations.md (1)
173-175: Add language identifier to code block.The code block showing the manual fix command is missing a language identifier. For consistency with the rest of the documentation and better rendering, please add
shellorbashas the language identifier.📝 Suggested improvement
**Manual fix:** -``` +```shell ZADD bb:schedule:leads <now_ms> <lead_id></details> <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@docs/event-driven-dispatch/06-operations.mdaround lines 173 - 175, Update
the markdown code fence that contains the Redis command "ZADD bb:schedule:leads
<now_ms> <lead_id>" to include a language identifier (e.g., use ```shell orexact fenced block containing that command in 06-operations.md and prepend the fence with the chosen language identifier.docs/event-driven-dispatch/03-data-model.md (1)
117-133: Add language identifier to Lua script block.The promoter tick Lua script is missing the
lualanguage identifier. This improves syntax highlighting and documentation clarity.📝 Suggested improvement
### Promoter tick (Lua script for atomic multi-move) -```lua +```lua -- KEYS[1] = bb:schedule:leads -- ARGV[1] = now_msNote: The diff shows the identifier is already correct in the source. If this is not rendering properly, it may be a display issue.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/event-driven-dispatch/03-data-model.md` around lines 117 - 133, Add the Lua language identifier to the code fence for the promoter tick script in docs/event-driven-dispatch/03-data-model.md so the block starts with ```lua (i.e., ensure the fenced code block containing the Lua snippet that references KEYS[1], ARGV[1], ARGV[2], ZRANGEBYSCORE, ZREM, and the moved/count loop is annotated as lua) to enable proper syntax highlighting and documentation clarity.docs/event-driven-dispatch/01-motivation.md (1)
23-32: Consider adding language identifier to code block.The timeline visualization (lines 23-32) uses a code block without a language identifier. While this is acceptable for ASCII art/diagrams, consider adding
textas the identifier for consistency with documentation standards.📝 Optional improvement
-``` +```text t=0s cron fires -> SELECT returns 500 rows -> iterating starts t=30s cron fires -> SELECT returns ~450 rows (50 now PROCESSING or locked)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/event-driven-dispatch/01-motivation.md` around lines 23 - 32, Add a language identifier to the timeline code block in the docs so the fenced block uses a labeled code fence (e.g., ```text) instead of an unlabeled triple-backtick fence; locate the ASCII timeline block that begins with "t=0s cron fires -> SELECT returns 500 rows -> iterating starts" and change the opening fence to ```text to match docs standards and ensure consistent rendering.docs/event-driven-dispatch/04-reliability.md (2)
184-184: Verify jitter calculation distributes load appropriately.The jitter of
rand(0, 2000ms)spreads 10k leads over 2 seconds. While this helps prevent thundering herd, consider whether this spread is sufficient for very large reconciliation scenarios (e.g., after extended outages with >100k backlogged leads).The doc states this is "combined with the channel semaphore" for natural pacing, which should work, but it may be worth noting that the jitter window might need tuning based on actual scale.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/event-driven-dispatch/04-reliability.md` at line 184, The jitter calculation using rand(0, 2000ms) in the reconciler's ZADD scoring may not sufficiently spread very large backlogs (>100k); update the reconciler to make the jitter window configurable (replace hardcoded rand(0, 2000ms) with a configurable JITTER_MS or jitterMillis param used in the ZADD score), add logic to scale the jitter window for large backlogs (e.g., increase jitterMillis when backlogSize > threshold), and emit telemetry metrics from the reconciler (backlogSize, jitterMillis, distribution stats) so you can verify the actual dispatch spread and tune the channel semaphore + jitter together.
154-154: Consider cross-referencing the reseller pause mechanism.The mitigation mentions
bb:reseller:paused:{id}but doesn't describe how workers check this flag or what happens when a reseller is paused. If this mechanism is detailed in another doc (e.g.,06-operations.md), consider adding a cross-reference here for completeness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/event-driven-dispatch/04-reliability.md` at line 154, The doc mentions the reseller pause key bb:reseller:paused:{id} but lacks details on how workers check this flag and what they do when a reseller is paused; add a cross-reference to the document that fully describes the mechanism (e.g., the operations doc that explains pause/unpause flow), and expand this section to briefly state that workers should inspect bb:reseller:paused:{id} before processing a lead, drop leads back to the ZSET with the configured delay when the key exists, and link to the detailed procedure (including any TTL, notification, and resume behavior) by referencing the target doc and the worker behavior (worker loop that reads the ZSET and pause check).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/event-driven-dispatch/02-architecture.md`:
- Around line 156-220: The doc's worker loop describes Redis ZADD-based
scheduling but the code uses DB-driven retries via process_backlog_leads() which
creates new BACKLOG rows with create_lead_call_tracker() and increments
attempt_count; update the architecture doc (02-architecture.md) to describe the
DB-driven retry flow and remove/annotate the Redis ZADD semantics. Also fix the
rate-limiter TODO in process_backlog_leads()/create_lead_call_tracker(): when a
lead is rate-limited advance next_attempt_at (or set a backoff) before
re-creating a BACKLOG row so it isn’t immediately re-scheduled, and mention this
behavior in the doc (include references to process_backlog_leads,
create_lead_call_tracker, BACKLOG, and next_attempt_at).
---
Nitpick comments:
In `@docs/event-driven-dispatch/01-motivation.md`:
- Around line 23-32: Add a language identifier to the timeline code block in the
docs so the fenced block uses a labeled code fence (e.g., ```text) instead of an
unlabeled triple-backtick fence; locate the ASCII timeline block that begins
with "t=0s cron fires -> SELECT returns 500 rows -> iterating starts" and
change the opening fence to ```text to match docs standards and ensure
consistent rendering.
In `@docs/event-driven-dispatch/03-data-model.md`:
- Around line 117-133: Add the Lua language identifier to the code fence for the
promoter tick script in docs/event-driven-dispatch/03-data-model.md so the block
starts with ```lua (i.e., ensure the fenced code block containing the Lua
snippet that references KEYS[1], ARGV[1], ARGV[2], ZRANGEBYSCORE, ZREM, and the
moved/count loop is annotated as lua) to enable proper syntax highlighting and
documentation clarity.
In `@docs/event-driven-dispatch/04-reliability.md`:
- Line 184: The jitter calculation using rand(0, 2000ms) in the reconciler's
ZADD scoring may not sufficiently spread very large backlogs (>100k); update the
reconciler to make the jitter window configurable (replace hardcoded rand(0,
2000ms) with a configurable JITTER_MS or jitterMillis param used in the ZADD
score), add logic to scale the jitter window for large backlogs (e.g., increase
jitterMillis when backlogSize > threshold), and emit telemetry metrics from the
reconciler (backlogSize, jitterMillis, distribution stats) so you can verify the
actual dispatch spread and tune the channel semaphore + jitter together.
- Line 154: The doc mentions the reseller pause key bb:reseller:paused:{id} but
lacks details on how workers check this flag and what they do when a reseller is
paused; add a cross-reference to the document that fully describes the mechanism
(e.g., the operations doc that explains pause/unpause flow), and expand this
section to briefly state that workers should inspect bb:reseller:paused:{id}
before processing a lead, drop leads back to the ZSET with the configured delay
when the key exists, and link to the detailed procedure (including any TTL,
notification, and resume behavior) by referencing the target doc and the worker
behavior (worker loop that reads the ZSET and pause check).
In `@docs/event-driven-dispatch/06-operations.md`:
- Around line 173-175: Update the markdown code fence that contains the Redis
command "ZADD bb:schedule:leads <now_ms> <lead_id>" to include a language
identifier (e.g., use ```shell or ```bash) so the snippet is rendered
consistently with other docs; locate the exact fenced block containing that
command in 06-operations.md and prepend the fence with the chosen language
identifier.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d6689432-d38d-4344-8a33-a6c44b4a89bc
📒 Files selected for processing (9)
CLAUDE.mddocs/BREEZE_BUDDY_ARCHITECTURE.mddocs/event-driven-dispatch/01-motivation.mddocs/event-driven-dispatch/02-architecture.mddocs/event-driven-dispatch/03-data-model.mddocs/event-driven-dispatch/04-reliability.mddocs/event-driven-dispatch/05-migration.mddocs/event-driven-dispatch/06-operations.mddocs/event-driven-dispatch/README.md
| while running: | ||
| # 1. Receive an event - blocking, not polling | ||
| lead_id = BLPOP ready:leads:{my_shard} timeout=30s | ||
| if no lead_id: continue # cycle for shutdown signal | ||
|
|
||
| # 2. Move to reliability list for crash recovery | ||
| RPUSH processing:leads:{worker_uuid} lead_id | ||
|
|
||
| try: | ||
| # 3. Load authoritative state from DB | ||
| lead = SELECT ... FROM lead_call_tracker WHERE id = lead_id | ||
| if lead.status != BACKLOG: | ||
| continue # already handled (e.g. cancelled) | ||
|
|
||
| # 4. Acquire row lock (existing atomic UPDATE) | ||
| if not acquire_lock_on_lead_by_id(lead_id, expected_status=BACKLOG): | ||
| continue # someone else owns it | ||
|
|
||
| # 5. Pre-checks (unchanged) | ||
| if not pass_pre_checks(lead): | ||
| finalize_with_precheck_failure(lead) | ||
| continue | ||
|
|
||
| # 6. Calling-hours guard | ||
| if outside_calling_hours(lead): | ||
| ZADD schedule:leads <next_window_ms> lead_id | ||
| release_row_lock(lead) | ||
| continue | ||
|
|
||
| # 7. Pick outbound number | ||
| number = pick_outbound_number(lead) | ||
| if not number: | ||
| ZADD schedule:leads <now + 10s> lead_id | ||
| release_row_lock(lead) | ||
| continue | ||
|
|
||
| # 8. Capacity gate | ||
| token = BLPOP channel:{number.id} timeout=10s | ||
| if not token: | ||
| # All channels on this number are busy right now | ||
| ZADD schedule:leads <now + jitter(1..3s)> lead_id | ||
| release_row_lock(lead) | ||
| continue | ||
|
|
||
| # 9. Per-customer rate limit (existing sliding window) | ||
| if rate_limited(lead.phone): | ||
| LPUSH channel:{number.id} token | ||
| ZADD schedule:leads <now + rate_limit_window_s> lead_id | ||
| release_row_lock(lead) | ||
| continue | ||
|
|
||
| # 10. Make the call | ||
| try: | ||
| call_sid = provider.make_call(number, lead) | ||
| UPDATE lead_call_tracker | ||
| SET status=PROCESSING, call_id=call_sid | ||
| WHERE id=lead_id AND status=BACKLOG | ||
| # token stays held - released by call-end webhook | ||
| except Exception: | ||
| LPUSH channel:{number.id} token | ||
| ZADD schedule:leads <now + backoff(lead.attempt_count)> lead_id | ||
|
|
||
| finally: | ||
| LREM processing:leads:{worker_uuid} 1 lead_id | ||
| ``` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check existing retry and backoff logic in the calls manager
echo "=== Checking retry logic in process_backlog_leads ==="
ast-grep --pattern 'def process_backlog_leads($$$):
$$$
' app/ai/voice/agents/breeze_buddy/managers/calls.py
echo -e "\n=== Checking retry/backoff related code ==="
rg -n "attempt_count|retry|backoff" app/ai/voice/agents/breeze_buddy/managers/calls.py -C 2
echo -e "\n=== Checking rate limiter integration ==="
rg -n "rate_limit|call_limiter" app/ai/voice/agents/breeze_buddy/managers/calls.py -C 2Repository: juspay/clairvoyance
Length of output: 50375
Clarify retry mechanism: documentation describes Redis scheduling but implementation uses database-driven retries.
The worker loop pseudocode (lines 156-220) describes scheduling retries via ZADD schedule:leads <next_attempt_at> in Redis. However, the actual implementation in process_backlog_leads() uses a different mechanism: it creates new BACKLOG rows in the database via create_lead_call_tracker() with an incremented attempt_count.
Align the architecture documentation with the actual DB-driven retry approach. Additionally, note the rate-limiter TODO at line 584-586 where next_attempt_at should be advanced to prevent immediate re-scheduling after rate-limit rejection.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/event-driven-dispatch/02-architecture.md` around lines 156 - 220, The
doc's worker loop describes Redis ZADD-based scheduling but the code uses
DB-driven retries via process_backlog_leads() which creates new BACKLOG rows
with create_lead_call_tracker() and increments attempt_count; update the
architecture doc (02-architecture.md) to describe the DB-driven retry flow and
remove/annotate the Redis ZADD semantics. Also fix the rate-limiter TODO in
process_backlog_leads()/create_lead_call_tracker(): when a lead is rate-limited
advance next_attempt_at (or set a backoff) before re-creating a BACKLOG row so
it isn’t immediately re-scheduled, and mention this behavior in the doc (include
references to process_backlog_leads, create_lead_call_tracker, BACKLOG, and
next_attempt_at).
Summary
docs/event-driven-dispatch/describing a Redis-backed event-driven replacement for the current cron-based backlog picker (process_backlog_leads+/cron/initiate). Covers motivation, architecture, data model, reliability/failure modes, phased dual-write migration, and operations runbooks./push/lead/v2references inCLAUDE.mdanddocs/BREEZE_BUDDY_ARCHITECTURE.md. The actual endpoint isPOST /leads(handled inapp/api/routers/breeze_buddy/leads/).Why
The current 30s cron + serial
process_backlog_leadsloop does not scale:SELECT ... WHERE status='BACKLOG' AND next_attempt_at <= NOW()over the full BACKLOG set.BackgroundTaskScheduler.managers/calls.py:584).The proposed design moves dispatch to an event path (Redis ZSET delayed queue → leader-elected promoter → sharded ready lists → worker pool → per-number channel semaphore). Hot-path DB polling is eliminated; telephony channel count becomes the only throughput limiter. The existing
BackgroundTaskScheduleris retained for slow control-plane reconcilers.Docs added
README.md01-motivation.md02-architecture.md03-data-model.md04-reliability.md05-migration.md06-operations.mdTest plan
/push/lead/v2in the repo (rg 'push/lead'returns nothing)🤖 Generated with Claude Code
Summary by CodeRabbit
POST /leads