feat: auto pause and system check#731
Conversation
WalkthroughIntroduces a comprehensive service health monitoring and circuit breaker system. Detects STT pipeline failures, records failures via Redis-backed counters, activates fallback routing when thresholds are exceeded, auto-resets circuits periodically, and gates call processing based on global pause state. Integrates monitoring across agent pipelines, call managers, dynamic configuration, and startup tasks. Changes
Sequence DiagramsequenceDiagram
participant Pipeline as Pipeline<br/>(Agent)
participant HealthMon as ServiceHealthMonitor
participant Fallback as ServiceFallback
participant Redis as Redis
participant Alert as Alert System<br/>(Slack)
participant CallMgr as Call Manager
Note over Pipeline,CallMgr: Error Detection & Recording
Pipeline->>HealthMon: record_pipeline_error(processor, error, call_sid)
HealthMon->>Fallback: record_failure(error, context)
Fallback->>Redis: INCR failure_counter<br/>EXPIRE (TTL=window)
Fallback->>Redis: SET failure_alert (NX+EX)<br/>deduplicate alerts
Fallback->>Alert: emit failure_alert (async)
Note over Fallback,Redis: Threshold Check & Activation
alt Failure Count >= Threshold
Fallback->>Redis: SET active_flag (TTL=fallback_duration)
Fallback->>Redis: DEL failure_counter
Fallback->>Alert: emit circuit_trip_alert
Fallback-->>HealthMon: return true (activated)
else Failure Count < Threshold
Fallback-->>HealthMon: return false
end
Note over CallMgr,Redis: Global Pause Check
CallMgr->>HealthMon: is_globally_paused()
HealthMon->>Redis: EXISTS active_flag
alt Circuit Active
HealthMon-->>CallMgr: true
CallMgr->>CallMgr: skip_lead + release_lock
else Circuit Inactive
HealthMon-->>CallMgr: false
CallMgr->>CallMgr: proceed_normally
end
Note over HealthMon,Redis: Auto-Reset Background Task
HealthMon->>Redis: check_all_active_circuits
alt Auto-Reset Triggered
HealthMon->>Redis: DEL active_flag
HealthMon->>Alert: emit circuit_reset_alert
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsTimed out fetching pipeline failures after 30000ms Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Pull request overview
Adds a Redis-backed “service health” circuit-breaker layer intended to auto-pause outbound calls when upstream providers (TTS/LLM/telephony) exhibit repeated failures, building on the existing STT fallback work referenced in dependency PR #647.
Changes:
- Introduces a generic
ServiceFallback(Redis + TTL) utility and registers an STT fallback reset background task. - Adds
service_healthpackage (rules + monitor) and a global “pause outbound calls” check in Breeze Buddy backlog processing. - Extends Breeze Buddy
on_pipeline_errorhandling to detect STT failures and end calls (Phase 1 behavior).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| app/services/service_health/rules.json | Defines per-service thresholds/windows for health rules. |
| app/services/service_health/monitor.py | Implements service health monitor (circuit breaker) and task registration. |
| app/services/service_health/init.py | Exposes monitor/task initializer as package API. |
| app/services/fallback/init.py | Adds generic Redis-backed fallback/circuit utility + STT reset task registration. |
| app/main.py | Wires STT fallback task initialization into app lifespan (service health import added). |
| app/core/config/dynamic.py | Adds Redis-backed flags for enabling service health + auto-resume window. |
| app/ai/voice/agents/breeze_buddy/managers/calls.py | Skips backlog lead processing when a global pause is active. |
| app/ai/voice/agents/breeze_buddy/agent/init.py | Adds STT error detection and failure recording/end-call behavior. |
| from app.ai.voice.agents.breeze_buddy.stt.fallback import ( | ||
| ALERT_STT_TERMINAL_FAILURE, | ||
| STT_FALLBACK_SLACK_TAG, | ||
| record_stt_failure, | ||
| send_templated_alert, | ||
| ) | ||
| from app.services.service_health import service_health_monitor |
There was a problem hiding this comment.
Several newly added imports appear unused in this module (ALERT_STT_TERMINAL_FAILURE, STT_FALLBACK_SLACK_TAG, send_templated_alert, and service_health_monitor unless used elsewhere). If they aren’t used later in the file, they should be removed to avoid confusion and keep imports accurate.
| from app.ai.voice.agents.breeze_buddy.stt.fallback import ( | |
| ALERT_STT_TERMINAL_FAILURE, | |
| STT_FALLBACK_SLACK_TAG, | |
| record_stt_failure, | |
| send_templated_alert, | |
| ) | |
| from app.services.service_health import service_health_monitor | |
| from app.ai.voice.agents.breeze_buddy.stt.fallback import record_stt_failure |
| async def check_and_reset_stt_fallback() -> None: | ||
| """Check if STT fallback is active and reset to primary if so.""" | ||
| try: | ||
| fallback_provider = await BB_STT_FALLBACK_PROVIDER() | ||
| fallback = ServiceFallback( | ||
| ServiceFallbackConfig( | ||
| service_name="stt", | ||
| failure_threshold=await BB_STT_FALLBACK_THRESHOLD(), | ||
| failure_window_secs=await BB_STT_FALLBACK_WINDOW_SECS(), | ||
| fallback_duration_secs=await BB_STT_FALLBACK_DURATION_SECS(), | ||
| fallback_provider_name=fallback_provider, | ||
| ) | ||
| ) | ||
| if not await fallback.is_active(): | ||
| return | ||
|
|
||
| logger.info("STT fallback active — resetting to primary provider") | ||
| await fallback.reset_to_primary() | ||
| except Exception as e: |
There was a problem hiding this comment.
check_and_reset_stt_fallback() calls reset_to_primary() whenever the active key exists. Because _activate() already sets a TTL on the active key, this scheduled task can clear the fallback early depending on scheduler alignment, reducing the intended cooldown and potentially flapping providers. Consider either relying on the TTL alone (and adjusting how reset alerts are emitted), or only resetting when the cooldown has actually elapsed (e.g., by checking the Redis TTL / stored activation timestamp).
| # Check global service health pause (circuit breaker pattern) | ||
| if await service_health_monitor.is_globally_paused(): | ||
| logger.info( | ||
| f"Skipping lead {locked_lead.id} - calls are globally paused due to service health" | ||
| ) | ||
| await release_lock_on_lead_by_id(locked_lead.id) | ||
| continue |
There was a problem hiding this comment.
The global pause check happens only after acquiring a per-lead DB lock. When calls are paused, this will still lock and unlock every backlog lead on each run, adding avoidable DB load. Consider checking is_globally_paused() once near the start of process_backlog_leads() (before querying/locking leads) and returning early (or sleeping) when paused.
| # LLM providers | ||
| "azure": "llm", | ||
| "openaillm": "llm", | ||
| "googlellm": "llm", | ||
| # Telephony transports (Twilio, Exotel, Plivo) |
There was a problem hiding this comment.
PROCESSOR_RULE_MAP uses exact string matches like "azure" / "openaillm". In pipeline errors, processor often appears as a class name (e.g. AzureLLMService, OpenAILLMService), so this mapping may never match and failures won’t be recorded. Consider normalizing to processor.__class__.__name__.lower() and/or adding the expected class-name keys (e.g. azurellmservice, openaillmservice) or switching to substring matching with a controlled allowlist.
| """ | ||
|
|
||
| import json | ||
| from datetime import datetime, timezone |
There was a problem hiding this comment.
datetime and timezone are imported but not used in this module. Removing unused imports will keep the module clean and avoid misleading readers about time-based logic in this file.
| from datetime import datetime, timezone |
|
|
||
| # Initialize STT fallback reset tasks | ||
| await initialize_fallback_tasks(_background_scheduler) | ||
|
|
There was a problem hiding this comment.
initialize_service_health_tasks is imported but never invoked in the lifespan startup, so the service health monitoring background task will never be registered (and the import is currently unused). If the monitor is intended to run, it should be initialized alongside the fallback/langfuse tasks when the background scheduler is created.
| # Initialize service health monitoring tasks | |
| await initialize_service_health_tasks(_background_scheduler) |
| ) | ||
| is_stt_error = any(kw in processor_str for kw in stt_keywords) | ||
|
|
||
| if not is_stt_error: |
There was a problem hiding this comment.
service_health_monitor is imported but never used in on_pipeline_error. As a result, non‑STT pipeline errors (TTS/LLM/telephony) aren’t recorded into the service health circuits, so auto‑pause can’t trigger from those failures. Consider calling service_health_monitor.record_pipeline_error(...) for non‑STT errors before returning.
| if not is_stt_error: | |
| if not is_stt_error: | |
| try: | |
| service_health_monitor.record_pipeline_error( | |
| processor=str(processor), | |
| error_message=str(error_msg), | |
| ) | |
| except Exception as health_err: | |
| logger.warning( | |
| f"Failed to record non-STT pipeline error in service health monitor: {health_err}" | |
| ) |
| async def reset_to_primary(self) -> None: | ||
| """Reset to primary: clear fallback flag.""" | ||
| try: | ||
| redis = await get_redis_service() | ||
|
|
||
| # Clear fallback flag | ||
| await redis.delete(self._key_active) | ||
| # Clear failure counter | ||
| await redis.delete(self._key_failure_count) | ||
| # Clear notification dedup key | ||
| await redis.delete(self._key_notified) | ||
|
|
||
| logger.info( | ||
| f"Service fallback ({self.config.service_name}) " f"reset to primary" | ||
| ) | ||
|
|
||
| # Reset alert | ||
| if self.config.on_reset_alert: | ||
| try: | ||
| await self.config.on_reset_alert( | ||
| service_name=self.config.service_name, | ||
| ) | ||
| except Exception as alert_err: | ||
| logger.warning( | ||
| f"Service fallback ({self.config.service_name}) " | ||
| f"reset alert failed: {alert_err}" | ||
| ) |
There was a problem hiding this comment.
reset_to_primary() always triggers the reset alert callback even if the fallback/circuit was not active. This can create noisy or misleading “reset/resumed” alerts when callers invoke reset defensively or when resume_calls() iterates over inactive circuits. Consider checking whether the active flag existed (or was deleted) before emitting a reset alert.
| """Evaluate all circuits and auto-reset if clean.""" | ||
| if not await ENABLE_SERVICE_HEALTH_MONITORING(): | ||
| return | ||
|
|
||
| for rule, circuit in _CIRCUITS.items(): | ||
| if not await circuit.is_active(): | ||
| continue | ||
| # Check if clean (no recent failures) - ServiceFallback TTL handles this | ||
| await circuit.reset_to_primary() | ||
|
|
||
|
|
There was a problem hiding this comment.
run_auto_health_check() resets every active circuit unconditionally. Because ServiceFallback._activate() sets the active flag with a TTL, this will clear the pause almost immediately (on the next 60s check), effectively bypassing SERVICE_HEALTH_AUTO_RESUME_MINUTES. This should only reset when the cooldown has actually elapsed (e.g., based on Redis TTL/activation timestamp), or the task should be removed/changed to only emit a reset alert rather than deleting the active key.
| """Evaluate all circuits and auto-reset if clean.""" | |
| if not await ENABLE_SERVICE_HEALTH_MONITORING(): | |
| return | |
| for rule, circuit in _CIRCUITS.items(): | |
| if not await circuit.is_active(): | |
| continue | |
| # Check if clean (no recent failures) - ServiceFallback TTL handles this | |
| await circuit.reset_to_primary() | |
| """Evaluate all circuits without clearing active pauses. | |
| Active circuit state is managed by ServiceFallback via a Redis TTL. Resetting | |
| the circuit from this periodic task would bypass the configured | |
| SERVICE_HEALTH_AUTO_RESUME_MINUTES cooldown, so this health check is | |
| intentionally observational only. | |
| """ | |
| if not await ENABLE_SERVICE_HEALTH_MONITORING(): | |
| return | |
| auto_resume_minutes = await SERVICE_HEALTH_AUTO_RESUME_MINUTES() | |
| for rule, circuit in _CIRCUITS.items(): | |
| if not await circuit.is_active(): | |
| continue | |
| logger.info( | |
| "[ServiceHealth] Circuit '%s' remains active; waiting for TTL-based " | |
| "auto-resume after %s minute(s)", | |
| rule, | |
| auto_resume_minutes, | |
| ) |
| await circuit._activate(await get_redis_service()) | ||
|
|
There was a problem hiding this comment.
pause_calls() calls the private ServiceFallback._activate() method. This makes the service health layer depend on fallback internals and makes it harder to change ServiceFallback safely. Prefer adding a public method on ServiceFallback (e.g., activate() / open_circuit()) and calling that here.
| await circuit._activate(await get_redis_service()) | |
| # Open the circuit through the public ServiceFallback API instead of | |
| # depending on the private _activate() implementation detail. | |
| threshold_count = int(config.get("threshold_count", 1)) | |
| for _ in range(max(threshold_count, 1)): | |
| await circuit.record_failure( | |
| error_msg=reason, | |
| call_sid="", | |
| context=f"manual-pause:{paused_by}", | |
| ) |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
app/services/service_health/monitor.py (2)
191-191: 💤 Low valueRename unused loop variable
ruleto_rule.Per the static analysis hint, the loop control variable
ruleis not used within the loop body.♻️ Proposed fix
- for rule, circuit in _CIRCUITS.items(): + for _rule, circuit in _CIRCUITS.items():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/service_health/monitor.py` at line 191, The loop in function iterating over _CIRCUITS.items() uses an unused variable named rule; rename that loop control variable from rule to _rule to satisfy static analysis and clarify intent (e.g., change "for rule, circuit in _CIRCUITS.items()" to "for _rule, circuit in _CIRCUITS.items()" in monitor.py where the loop is defined).
79-82: 💤 Low valueMinor: Comment doesn't match actual key prefix.
The comment says "Use circuit: prefix instead of fallback:" but
key_prefixis set to"health", resulting in keys likehealth:elevenlabs:active.♻️ Fix the comment
_CIRCUITS[rule] = ServiceFallback( ServiceFallbackConfig( service_name=rule, - key_prefix="health", # Use circuit: prefix instead of fallback: + key_prefix="health", # Use health: prefix instead of fallback:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/service_health/monitor.py` around lines 79 - 82, The inline comment next to the ServiceFallbackConfig key_prefix is incorrect: it currently says "Use circuit: prefix instead of fallback:" while key_prefix is set to "health", causing mismatch; update the comment in the _CIRCUITS initialization (where ServiceFallback and ServiceFallbackConfig are constructed) to accurately reflect the actual key_prefix value (e.g., "Use health: prefix for keys" or similar) so the comment matches the key_prefix="health".
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py`:
- Around line 613-647: The block handling STT errors uses undefined symbols and
uninitialized attributes: import or define fire_and_forget and EndFrame (from
pipecat.frames.frames) so calls to
fire_and_forget(self._send_mid_call_stt_alert()) and EndFrame() resolve;
initialize self.stt_provider and self._stt_failure_recorded in the class
__init__ (e.g., set default provider string and False) so checks in that block
won't raise AttributeError; and replace the silent "except: pass" around await
task.queue_frames([EndFrame()]) with a specific exception handler that logs the
error (use logger.warning or logger.exception) to avoid swallowing failures.
Ensure references to record_stt_failure and _send_mid_call_stt_alert remain
unchanged.
In `@app/main.py`:
- Around line 172-175: The import initialize_service_health_tasks is never
invoked, so the service health job (check_and_reset_circuits) isn’t registered;
add a call to await initialize_service_health_tasks(_background_scheduler)
(similar to the existing await initialize_fallback_tasks(_background_scheduler))
in the initialization sequence (e.g., immediately after
initialize_fallback_tasks) so the background scheduler registers the
check_and_reset_circuits task.
In `@app/services/fallback/__init__.py`:
- Around line 23-29: Add the missing async config accessors to
app/core/config/dynamic.py: implement async functions named
ENABLE_BB_STT_FALLBACK(), BB_STT_FALLBACK_PROVIDER(),
BB_STT_FALLBACK_THRESHOLD(), BB_STT_FALLBACK_WINDOW_SECS(), and
BB_STT_FALLBACK_DURATION_SECS() that return the expected typed config values
(bool, str, int/float as appropriate) by reading from your existing settings/env
or configuration store; ensure they are declared async, have correct return type
hints, provide sensible defaults, and are exported (update __all__ if the module
uses it) so importing these symbols from app.services.fallback.__init__ will not
raise ImportError.
In `@app/services/service_health/monitor.py`:
- Around line 186-196: run_auto_health_check currently calls reset_to_primary()
for every active circuit in _CIRCUITS unconditionally, which clears circuits and
fires reset alerts prematurely; instead, fetch and respect the existing Redis
TTL for each circuit's _key_active (or check existence) and only call
reset_to_primary() when the key has expired or its TTL is below a small
threshold (e.g., <= probe window) or when an actual health probe indicates
recovery; update run_auto_health_check to read the circuit's _key_active TTL (or
add a circuit.is_about_to_expire()/has_expired() helper) and only call
reset_to_primary() in those cases so fallback_duration_secs and ServiceFallback
TTL handling govern automatic resets.
---
Nitpick comments:
In `@app/services/service_health/monitor.py`:
- Line 191: The loop in function iterating over _CIRCUITS.items() uses an unused
variable named rule; rename that loop control variable from rule to _rule to
satisfy static analysis and clarify intent (e.g., change "for rule, circuit in
_CIRCUITS.items()" to "for _rule, circuit in _CIRCUITS.items()" in monitor.py
where the loop is defined).
- Around line 79-82: The inline comment next to the ServiceFallbackConfig
key_prefix is incorrect: it currently says "Use circuit: prefix instead of
fallback:" while key_prefix is set to "health", causing mismatch; update the
comment in the _CIRCUITS initialization (where ServiceFallback and
ServiceFallbackConfig are constructed) to accurately reflect the actual
key_prefix value (e.g., "Use health: prefix for keys" or similar) so the comment
matches the key_prefix="health".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 58ed53ee-bfc3-4dd1-bd96-bc50f42a4905
📒 Files selected for processing (8)
app/ai/voice/agents/breeze_buddy/agent/__init__.pyapp/ai/voice/agents/breeze_buddy/managers/calls.pyapp/core/config/dynamic.pyapp/main.pyapp/services/fallback/__init__.pyapp/services/service_health/__init__.pyapp/services/service_health/monitor.pyapp/services/service_health/rules.json
| # Detect STT errors by processor name keywords | ||
| processor_str = str(processor).lower() | ||
| stt_keywords = ( | ||
| "stt", | ||
| "soniox", | ||
| "deepgram", | ||
| "transcri", | ||
| "google", | ||
| "sarvam", | ||
| ) | ||
| is_stt_error = any(kw in processor_str for kw in stt_keywords) | ||
|
|
||
| if not is_stt_error: | ||
| return | ||
|
|
||
| logger.warning(f"STT error detected from processor: {processor}") | ||
|
|
||
| # Record failure in fallback system (once per call, Soniox only) | ||
| if self.stt_provider == "soniox" and not self._stt_failure_recorded: | ||
| self._stt_failure_recorded = True | ||
| try: | ||
| await record_stt_failure( | ||
| error_msg=str(error_msg)[:200], | ||
| call_sid=self.call_sid or "", | ||
| context="mid-call", | ||
| ) | ||
| except Exception as fb_err: | ||
| logger.warning(f"STT fallback record_failure failed: {fb_err}") | ||
|
|
||
| # Alert and end call — no mid-call swap in Phase 1 | ||
| fire_and_forget(self._send_mid_call_stt_alert()) | ||
| try: | ||
| await task.queue_frames([EndFrame()]) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if fire_and_forget is defined or imported anywhere in the codebase
echo "=== Searching for fire_and_forget definition/import ==="
rg -n "fire_and_forget" --type=py
echo ""
echo "=== Checking EndFrame import in agent/__init__.py ==="
rg -n "from pipecat.frames.frames import" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Checking stt_provider attribute initialization ==="
rg -n "self\.stt_provider" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Checking _stt_failure_recorded attribute initialization ==="
rg -n "self\._stt_failure_recorded" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 2978
🏁 Script executed:
#!/bin/bash
# Get first 100 lines to see all imports
echo "=== First 100 lines (imports section) ==="
head -100 app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Check if fire_and_forget is defined in agent/__init__.py ==="
rg -n "def fire_and_forget" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for __init__ method and surrounding context ==="
rg -n "def __init__" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 3709
🏁 Script executed:
#!/bin/bash
# Read the __init__ method body (lines 116-200 to capture most of it)
echo "=== __init__ method body (lines 116-250) ==="
sed -n '116,250p' app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for fire_and_forget definition anywhere in file ==="
grep -n "def fire_and_forget\|fire_and_forget.*=" app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo ""
echo "=== Search for any fire_and_forget function or import in entire file ==="
grep -n "fire_and_forget" app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 5457
🏁 Script executed:
#!/bin/bash
# Verify the exact line range (613-647) matches the code snippet shown in review
echo "=== Lines 613-647 to verify against review snippet ==="
sed -n '613,647p' app/ai/voice/agents/breeze_buddy/agent/__init__.pyRepository: juspay/clairvoyance
Length of output: 1466
Critical: Undefined names and attributes will cause runtime errors.
The code contains multiple undefined names and attributes that will raise NameError and AttributeError at runtime:
fire_and_forget(line 643) — not imported or defined anywhere in the fileEndFrame(line 645) — not imported frompipecat.frames.framesself.stt_provider(line 631) — not initialized in__init__self._stt_failure_recorded(line 632) — not initialized in__init__
The silent except: pass (lines 646-647) will also swallow errors without any indication of failure.
Proposed fixes
Add missing import:
-from pipecat.frames.frames import LLMMessagesAppendFrame, TTSSpeakFrame
+from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrameImport or define fire_and_forget:
+import asyncio
+
+def fire_and_forget(coro):
+ """Schedule coroutine without awaiting."""
+ asyncio.create_task(coro)Initialize attributes in __init__ after line 195 (error tracking section):
# Error tracking
self.errors: List[Dict[str, Any]] = []
+
+ # STT fallback tracking
+ self.stt_provider: Optional[str] = None
+ self._stt_failure_recorded: bool = FalseReplace silent exception:
- try:
- await task.queue_frames([EndFrame()])
- except Exception:
- pass
+ try:
+ await task.queue_frames([EndFrame()])
+ except Exception as e:
+ logger.warning(f"Failed to queue EndFrame: {e}")🧰 Tools
🪛 Ruff (0.15.12)
[warning] 639-639: Do not catch blind exception: Exception
(BLE001)
[error] 643-643: Undefined name fire_and_forget
(F821)
[error] 645-645: Undefined name EndFrame
(F821)
[error] 646-647: try-except-pass detected, consider logging the exception
(S110)
[warning] 646-646: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 613 - 647,
The block handling STT errors uses undefined symbols and uninitialized
attributes: import or define fire_and_forget and EndFrame (from
pipecat.frames.frames) so calls to
fire_and_forget(self._send_mid_call_stt_alert()) and EndFrame() resolve;
initialize self.stt_provider and self._stt_failure_recorded in the class
__init__ (e.g., set default provider string and False) so checks in that block
won't raise AttributeError; and replace the silent "except: pass" around await
task.queue_frames([EndFrame()]) with a specific exception handler that logs the
error (use logger.warning or logger.exception) to avoid swallowing failures.
Ensure references to record_stt_failure and _send_mid_call_stt_alert remain
unchanged.
| # Initialize STT fallback reset tasks | ||
| await initialize_fallback_tasks(_background_scheduler) | ||
|
|
||
| ### Register new tasks here |
There was a problem hiding this comment.
Missing call to initialize_service_health_tasks.
initialize_service_health_tasks is imported on line 69 but never invoked. The service health background task (check_and_reset_circuits) will not be registered, meaning circuits will never auto-reset based on the configured schedule.
🐛 Proposed fix to register service health tasks
# Initialize STT fallback reset tasks
await initialize_fallback_tasks(_background_scheduler)
+ # Initialize service health check tasks
+ await initialize_service_health_tasks(_background_scheduler)
+
### Register new tasks here📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Initialize STT fallback reset tasks | |
| await initialize_fallback_tasks(_background_scheduler) | |
| ### Register new tasks here | |
| # Initialize STT fallback reset tasks | |
| await initialize_fallback_tasks(_background_scheduler) | |
| # Initialize service health check tasks | |
| await initialize_service_health_tasks(_background_scheduler) | |
| ### Register new tasks here |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/main.py` around lines 172 - 175, The import
initialize_service_health_tasks is never invoked, so the service health job
(check_and_reset_circuits) isn’t registered; add a call to await
initialize_service_health_tasks(_background_scheduler) (similar to the existing
await initialize_fallback_tasks(_background_scheduler)) in the initialization
sequence (e.g., immediately after initialize_fallback_tasks) so the background
scheduler registers the check_and_reset_circuits task.
| from app.core.config.dynamic import ( | ||
| BB_STT_FALLBACK_DURATION_SECS, | ||
| BB_STT_FALLBACK_PROVIDER, | ||
| BB_STT_FALLBACK_THRESHOLD, | ||
| BB_STT_FALLBACK_WINDOW_SECS, | ||
| ENABLE_BB_STT_FALLBACK, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if STT fallback config functions exist in dynamic.py
echo "=== Checking for STT fallback config functions in dynamic.py ==="
rg -n "BB_STT_FALLBACK|ENABLE_BB_STT_FALLBACK" app/core/config/dynamic.py
echo ""
echo "=== Full list of async def functions in dynamic.py ==="
rg -n "^async def " app/core/config/dynamic.pyRepository: juspay/clairvoyance
Length of output: 2803
🏁 Script executed:
# First, read the actual file to confirm the imports shown in the review
head -35 app/services/fallback/__init__.pyRepository: juspay/clairvoyance
Length of output: 1475
🏁 Script executed:
# Search for the missing config functions anywhere in the codebase
rg -r "BB_STT_FALLBACK_DURATION_SECS|BB_STT_FALLBACK_PROVIDER|BB_STT_FALLBACK_THRESHOLD|BB_STT_FALLBACK_WINDOW_SECS|ENABLE_BB_STT_FALLBACK" --type pyRepository: juspay/clairvoyance
Length of output: 460
🏁 Script executed:
# Check if app/services/fallback/__init__.py exists and its size
wc -l app/services/fallback/__init__.pyRepository: juspay/clairvoyance
Length of output: 101
🏁 Script executed:
# Search for where these config functions are defined (not imported)
rg "async def BB_STT_FALLBACK_DURATION_SECS|async def BB_STT_FALLBACK_PROVIDER|async def BB_STT_FALLBACK_THRESHOLD|async def BB_STT_FALLBACK_WINDOW_SECS|async def ENABLE_BB_STT_FALLBACK" --type pyRepository: juspay/clairvoyance
Length of output: 45
🏁 Script executed:
# More comprehensive search - look for these constants/functions anywhere
rg "BB_STT_FALLBACK_DURATION_SECS|ENABLE_BB_STT_FALLBACK" --type pyRepository: juspay/clairvoyance
Length of output: 486
Critical: Missing dynamic configuration functions will cause ImportError.
The imports on lines 23-29 reference configuration functions that do not exist anywhere in the codebase:
BB_STT_FALLBACK_DURATION_SECSBB_STT_FALLBACK_PROVIDERBB_STT_FALLBACK_THRESHOLDBB_STT_FALLBACK_WINDOW_SECSENABLE_BB_STT_FALLBACK
These functions are not defined in app/core/config/dynamic.py (only ENABLE_SERVICE_HEALTH_MONITORING and SERVICE_HEALTH_AUTO_RESUME_MINUTES were added). Add the missing async config functions to app/core/config/dynamic.py before this module can be imported.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/fallback/__init__.py` around lines 23 - 29, Add the missing
async config accessors to app/core/config/dynamic.py: implement async functions
named ENABLE_BB_STT_FALLBACK(), BB_STT_FALLBACK_PROVIDER(),
BB_STT_FALLBACK_THRESHOLD(), BB_STT_FALLBACK_WINDOW_SECS(), and
BB_STT_FALLBACK_DURATION_SECS() that return the expected typed config values
(bool, str, int/float as appropriate) by reading from your existing settings/env
or configuration store; ensure they are declared async, have correct return type
hints, provide sensible defaults, and are exported (update __all__ if the module
uses it) so importing these symbols from app.services.fallback.__init__ will not
raise ImportError.
| async def run_auto_health_check(self) -> None: | ||
| """Evaluate all circuits and auto-reset if clean.""" | ||
| if not await ENABLE_SERVICE_HEALTH_MONITORING(): | ||
| return | ||
|
|
||
| for rule, circuit in _CIRCUITS.items(): | ||
| if not await circuit.is_active(): | ||
| continue | ||
| # Check if clean (no recent failures) - ServiceFallback TTL handles this | ||
| await circuit.reset_to_primary() | ||
|
|
There was a problem hiding this comment.
Auto health check unconditionally resets all active circuits.
The run_auto_health_check method calls reset_to_primary() on every active circuit without verifying whether failures have actually subsided. The comment mentions "ServiceFallback TTL handles this," but then immediately resets anyway.
Since the _key_active Redis key already has a TTL (fallback_duration_secs) that auto-expires, calling reset_to_primary() every 60 seconds will:
- Clear circuits before their configured duration expires
- Fire reset alerts prematurely
If the intent is to let TTL handle expiration, this loop should only fire the reset alert when the key is about to expire or has expired, not force-reset every active circuit.
🔧 Suggested approach
Either remove the unconditional reset:
async def run_auto_health_check(self) -> None:
"""Evaluate all circuits and auto-reset if clean."""
if not await ENABLE_SERVICE_HEALTH_MONITORING():
return
- for rule, circuit in _CIRCUITS.items():
- if not await circuit.is_active():
- continue
- # Check if clean (no recent failures) - ServiceFallback TTL handles this
- await circuit.reset_to_primary()
+ # TTL on the active key handles auto-expiration.
+ # This task now only serves to fire reset alerts when keys expire.
+ for _rule, circuit in _CIRCUITS.items():
+ # Check if circuit was active but has now expired (TTL elapsed)
+ # No action needed - key self-expired
+ passOr add actual health-check logic (e.g., probe the service):
async def run_auto_health_check(self) -> None:
- """Evaluate all circuits and auto-reset if clean."""
+ """Probe services and reset circuits if healthy."""
if not await ENABLE_SERVICE_HEALTH_MONITORING():
return
- for rule, circuit in _CIRCUITS.items():
+ for _rule, circuit in _CIRCUITS.items():
if not await circuit.is_active():
continue
- # Check if clean (no recent failures) - ServiceFallback TTL handles this
- await circuit.reset_to_primary()
+ # TODO: Add actual health probe logic here
+ # Only reset if probe succeeds🧰 Tools
🪛 Ruff (0.15.12)
[warning] 191-191: Loop control variable rule not used within loop body
Rename unused rule to _rule
(B007)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/service_health/monitor.py` around lines 186 - 196,
run_auto_health_check currently calls reset_to_primary() for every active
circuit in _CIRCUITS unconditionally, which clears circuits and fires reset
alerts prematurely; instead, fetch and respect the existing Redis TTL for each
circuit's _key_active (or check existence) and only call reset_to_primary() when
the key has expired or its TTL is below a small threshold (e.g., <= probe
window) or when an actual health probe indicates recovery; update
run_auto_health_check to read the circuit's _key_active TTL (or add a
circuit.is_about_to_expire()/has_expired() helper) and only call
reset_to_primary() in those cases so fallback_duration_secs and ServiceFallback
TTL handling govern automatic resets.
Dependency PR :
#647
Summary by CodeRabbit
Release Notes
New Features
Improvements